4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2015, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define M_PI 3.14159265358979323846
58 #define ERRCODE_UNDEFINED_TABLE "42P01"
61 * Multi-platform pthread implementations
65 /* Use native win32 threads on Windows */
66 typedef struct win32_pthread *pthread_t;
67 typedef int pthread_attr_t;
69 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
70 static int pthread_join(pthread_t th, void **thread_return);
71 #elif defined(ENABLE_THREAD_SAFETY)
72 /* Use platform-dependent pthread capability */
75 /* No threads implementation, use none (-j 1) */
76 #define pthread_t void *
80 /********************************************************************
81 * some configurable parameters */
83 /* max number of clients allowed */
85 #define MAXCLIENTS (FD_SETSIZE - 10)
87 #define MAXCLIENTS 1024
90 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
91 #define DEFAULT_NXACTS 10 /* default nxacts */
93 #define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */
95 int nxacts = 0; /* number of transactions per client */
96 int duration = 0; /* duration in seconds */
99 * scaling factor. for example, scale = 10 will make 1000000 tuples in
100 * pgbench_accounts table.
105 * fillfactor. for example, fillfactor = 90 will use only 90 percent
106 * space during inserts and leave 10 percent free.
108 int fillfactor = 100;
111 * create foreign key constraints on the tables?
113 int foreign_keys = 0;
116 * use unlogged tables?
118 int unlogged_tables = 0;
121 * log sampling rate (1.0 = log everything, 0.0 = option not given)
123 double sample_rate = 0.0;
126 * When threads are throttled to a given rate limit, this is the target delay
127 * to reach that rate in usec. 0 is the default and means no throttling.
129 int64 throttle_delay = 0;
132 * Transactions which take longer than this limit (in usec) are counted as
133 * late, and reported as such, although they are completed anyway. When
134 * throttling is enabled, execution time slots that are more than this late
135 * are skipped altogether, and counted separately.
137 int64 latency_limit = 0;
140 * tablespace selection
142 char *tablespace = NULL;
143 char *index_tablespace = NULL;
146 * end of configurable parameters
147 *********************************************************************/
149 #define nbranches 1 /* Makes little sense to change this. Change
152 #define naccounts 100000
155 * The scale factor at/beyond which 32bit integers are incapable of storing
158 * Although the actual threshold is 21474, we use 20000 because it is easier to
159 * document and remember, and isn't that far away from the real threshold.
161 #define SCALE_32BIT_THRESHOLD 20000
163 bool use_log; /* log transaction latencies to a file */
164 bool use_quiet; /* quiet logging onto stderr */
165 int agg_interval; /* log aggregates instead of individual
167 int progress = 0; /* thread progress report every this seconds */
168 int progress_nclients = 0; /* number of clients for progress
170 int progress_nthreads = 0; /* number of threads for progress
172 bool is_connect; /* establish connection for each transaction */
173 bool is_latencies; /* report per-command latencies */
174 int main_pid; /* main process id used in log filename */
180 const char *progname;
182 volatile bool timer_exceeded = false; /* flag from signal handler */
184 /* variable definitions */
187 char *name; /* variable name */
188 char *value; /* its value */
191 #define MAX_FILES 128 /* max number of SQL script files allowed */
192 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
195 * structures used in custom query mode
200 PGconn *con; /* connection handle to DB */
201 int id; /* client No. */
202 int state; /* state No. */
203 int listen; /* 0 indicates that an async query has been
205 int sleeping; /* 1 indicates that the client is napping */
206 bool throttling; /* whether nap is for throttling */
207 Variable *variables; /* array of variable definitions */
209 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
210 instr_time txn_begin; /* used for measuring schedule lag times */
211 instr_time stmt_begin; /* used for measuring statement latencies */
212 bool is_throttled; /* whether transaction throttling is done */
213 int use_file; /* index in sql_files for this client */
214 bool prepared[MAX_FILES];
216 /* per client collected stats */
217 int cnt; /* xacts count */
218 int ecnt; /* error count */
219 int64 txn_latencies; /* cumulated latencies */
220 int64 txn_sqlats; /* cumulated square latencies */
228 int tid; /* thread id */
229 pthread_t thread; /* thread handle */
230 CState *state; /* array of CState */
231 int nstate; /* length of state[] */
232 instr_time start_time; /* thread start time */
233 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
234 int *exec_count; /* number of cmd executions (per Command) */
235 unsigned short random_state[3]; /* separate randomness for each thread */
236 int64 throttle_trigger; /* previous/next throttling (us) */
238 /* per thread collected stats */
239 instr_time conn_time;
240 int64 throttle_lag; /* total transaction lag behind throttling */
241 int64 throttle_lag_max; /* max transaction lag */
242 int64 throttle_latency_skipped; /* lagging transactions
244 int64 latency_late; /* late transactions */
247 #define INVALID_THREAD ((pthread_t) 0)
250 * queries read from files
252 #define SQL_COMMAND 1
253 #define META_COMMAND 2
256 typedef enum QueryMode
258 QUERY_SIMPLE, /* simple query */
259 QUERY_EXTENDED, /* extended query */
260 QUERY_PREPARED, /* extended query with prepared statements */
264 static QueryMode querymode = QUERY_SIMPLE;
265 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
269 char *line; /* full text of command line */
270 int command_num; /* unique index of this Command struct */
271 int type; /* command type (SQL_COMMAND or META_COMMAND) */
272 int argc; /* number of command words */
273 char *argv[MAX_ARGS]; /* command word list */
274 int cols[MAX_ARGS]; /* corresponding column starting from 1 */
275 PgBenchExpr *expr; /* parsed expression */
281 long start_time; /* when does the interval start */
282 int cnt; /* number of transactions */
283 int skipped; /* number of transactions skipped under --rate
284 * and --latency-limit */
286 double min_latency; /* min/max latencies */
288 double sum_latency; /* sum(latency), sum(latency^2) - for
294 double sum_lag; /* sum(lag) */
295 double sum2_lag; /* sum(lag*lag) */
298 static Command **sql_files[MAX_FILES]; /* SQL script files */
299 static int num_files; /* number of script files */
300 static int num_commands = 0; /* total number of Command structs */
301 static int debug = 0; /* debug flag */
303 /* default scenario */
304 static char *tpc_b = {
305 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
306 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
307 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
308 "\\setrandom aid 1 :naccounts\n"
309 "\\setrandom bid 1 :nbranches\n"
310 "\\setrandom tid 1 :ntellers\n"
311 "\\setrandom delta -5000 5000\n"
313 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
314 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
315 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
316 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
317 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
322 static char *simple_update = {
323 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
324 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
325 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
326 "\\setrandom aid 1 :naccounts\n"
327 "\\setrandom bid 1 :nbranches\n"
328 "\\setrandom tid 1 :ntellers\n"
329 "\\setrandom delta -5000 5000\n"
331 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
332 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
333 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
338 static char *select_only = {
339 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
340 "\\setrandom aid 1 :naccounts\n"
341 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
344 /* Function prototypes */
345 static void setalarm(int seconds);
346 static void *threadRun(void *arg);
348 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
349 AggVals *agg, bool skipped);
354 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
356 " %s [OPTION]... [DBNAME]\n"
357 "\nInitialization options:\n"
358 " -i, --initialize invokes initialization mode\n"
359 " -F, --fillfactor=NUM set fill factor\n"
360 " -n, --no-vacuum do not run VACUUM after initialization\n"
361 " -q, --quiet quiet logging (one message each 5 seconds)\n"
362 " -s, --scale=NUM scaling factor\n"
363 " --foreign-keys create foreign key constraints between tables\n"
364 " --index-tablespace=TABLESPACE\n"
365 " create indexes in the specified tablespace\n"
366 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
367 " --unlogged-tables create tables as unlogged tables\n"
368 "\nBenchmarking options:\n"
369 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
370 " -C, --connect establish new connection for each transaction\n"
371 " -D, --define=VARNAME=VALUE\n"
372 " define variable for use by custom script\n"
373 " -f, --file=FILENAME read transaction script from FILENAME\n"
374 " -j, --jobs=NUM number of threads (default: 1)\n"
375 " -l, --log write transaction times to log file\n"
376 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
377 " -M, --protocol=simple|extended|prepared\n"
378 " protocol for submitting queries (default: simple)\n"
379 " -n, --no-vacuum do not run VACUUM before tests\n"
380 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
381 " -P, --progress=NUM show thread progress report every NUM seconds\n"
382 " -r, --report-latencies report average latency per command\n"
383 " -R, --rate=NUM target rate in transactions per second\n"
384 " -s, --scale=NUM report this scale factor in output\n"
385 " -S, --select-only perform SELECT-only transactions\n"
386 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
387 " -T, --time=NUM duration of benchmark test in seconds\n"
388 " -v, --vacuum-all vacuum all four standard tables before tests\n"
389 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
390 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
391 "\nCommon options:\n"
392 " -d, --debug print debugging output\n"
393 " -h, --host=HOSTNAME database server host or socket directory\n"
394 " -p, --port=PORT database server port number\n"
395 " -U, --username=USERNAME connect as specified database user\n"
396 " -V, --version output version information, then exit\n"
397 " -?, --help show this help, then exit\n"
399 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
404 * strtoint64 -- convert a string to 64-bit integer
406 * This function is a modified version of scanint8() from
407 * src/backend/utils/adt/int8.c.
410 strtoint64(const char *str)
412 const char *ptr = str;
417 * Do our own scan, rather than relying on sscanf which might be broken
421 /* skip leading spaces */
422 while (*ptr && isspace((unsigned char) *ptr))
431 * Do an explicit check for INT64_MIN. Ugly though this is, it's
432 * cleaner than trying to get the loop below to handle it portably.
434 if (strncmp(ptr, "9223372036854775808", 19) == 0)
436 result = PG_INT64_MIN;
442 else if (*ptr == '+')
445 /* require at least one digit */
446 if (!isdigit((unsigned char) *ptr))
447 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
450 while (*ptr && isdigit((unsigned char) *ptr))
452 int64 tmp = result * 10 + (*ptr++ - '0');
454 if ((tmp / 10) != result) /* overflow? */
455 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
461 /* allow trailing whitespace, but not other trailing chars */
462 while (*ptr != '\0' && isspace((unsigned char) *ptr))
466 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
468 return ((sign < 0) ? -result : result);
471 /* random number generator: uniform distribution from min to max inclusive */
473 getrand(TState *thread, int64 min, int64 max)
476 * Odd coding is so that min and max have approximately the same chance of
477 * being selected as do numbers between them.
479 * pg_erand48() is thread-safe and concurrent, which is why we use it
480 * rather than random(), which in glibc is non-reentrant, and therefore
481 * protected by a mutex, and therefore a bottleneck on machines with many
484 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
488 * random number generator: exponential distribution from min to max inclusive.
489 * the threshold is so that the density of probability for the last cut-off max
490 * value is exp(-threshold).
493 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
499 Assert(threshold > 0.0);
500 cut = exp(-threshold);
501 /* erand in [0, 1), uniform in (0, 1] */
502 uniform = 1.0 - pg_erand48(thread->random_state);
505 * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1)
507 Assert((1.0 - cut) != 0.0);
508 rand = -log(cut + (1.0 - cut) * uniform) / threshold;
509 /* return int64 random number within between min and max */
510 return min + (int64) ((max - min + 1) * rand);
513 /* random number generator: gaussian distribution from min to max inclusive */
515 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
521 * Get user specified random number from this loop, with -threshold <
524 * This loop is executed until the number is in the expected range.
526 * As the minimum threshold is 2.0, the probability of looping is low:
527 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
528 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
529 * the worst case. For a 5.0 threshold value, the looping probability is
530 * about e^{-5} * 2 / pi ~ 0.43%.
535 * pg_erand48 generates [0,1), but for the basic version of the
536 * Box-Muller transform the two uniformly distributed random numbers
537 * are expected in (0, 1] (see
538 * http://en.wikipedia.org/wiki/Box_muller)
540 double rand1 = 1.0 - pg_erand48(thread->random_state);
541 double rand2 = 1.0 - pg_erand48(thread->random_state);
543 /* Box-Muller basic form transform */
544 double var_sqrt = sqrt(-2.0 * log(rand1));
546 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
549 * we may try with cos, but there may be a bias induced if the
550 * previous value fails the test. To be on the safe side, let us try
554 while (stdev < -threshold || stdev >= threshold);
556 /* stdev is in [-threshold, threshold), normalization to [0,1) */
557 rand = (stdev + threshold) / (threshold * 2.0);
559 /* return int64 random number within between min and max */
560 return min + (int64) ((max - min + 1) * rand);
564 * random number generator: generate a value, such that the series of values
565 * will approximate a Poisson distribution centered on the given value.
568 getPoissonRand(TState *thread, int64 center)
571 * Use inverse transform sampling to generate a value > 0, such that the
572 * expected (i.e. average) value is the given argument.
576 /* erand in [0, 1), uniform in (0, 1] */
577 uniform = 1.0 - pg_erand48(thread->random_state);
579 return (int64) (-log(uniform) * ((double) center) + 0.5);
582 /* call PQexec() and exit() on failure */
584 executeStatement(PGconn *con, const char *sql)
588 res = PQexec(con, sql);
589 if (PQresultStatus(res) != PGRES_COMMAND_OK)
591 fprintf(stderr, "%s", PQerrorMessage(con));
597 /* call PQexec() and complain, but without exiting, on failure */
599 tryExecuteStatement(PGconn *con, const char *sql)
603 res = PQexec(con, sql);
604 if (PQresultStatus(res) != PGRES_COMMAND_OK)
606 fprintf(stderr, "%s", PQerrorMessage(con));
607 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
612 /* set up a connection to the backend */
617 static char *password = NULL;
621 * Start the connection. Loop until we have a password if requested by
626 #define PARAMS_ARRAY_SIZE 7
628 const char *keywords[PARAMS_ARRAY_SIZE];
629 const char *values[PARAMS_ARRAY_SIZE];
631 keywords[0] = "host";
633 keywords[1] = "port";
635 keywords[2] = "user";
637 keywords[3] = "password";
638 values[3] = password;
639 keywords[4] = "dbname";
641 keywords[5] = "fallback_application_name";
642 values[5] = progname;
648 conn = PQconnectdbParams(keywords, values, true);
652 fprintf(stderr, "connection to database \"%s\" failed\n",
657 if (PQstatus(conn) == CONNECTION_BAD &&
658 PQconnectionNeedsPassword(conn) &&
662 password = simple_prompt("Password: ", 100, false);
667 /* check to see that the backend connection was successfully made */
668 if (PQstatus(conn) == CONNECTION_BAD)
670 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
671 dbName, PQerrorMessage(conn));
679 /* throw away response from backend */
681 discard_response(CState *state)
687 res = PQgetResult(state->con);
694 compareVariables(const void *v1, const void *v2)
696 return strcmp(((const Variable *) v1)->name,
697 ((const Variable *) v2)->name);
701 getVariable(CState *st, char *name)
706 /* On some versions of Solaris, bsearch of zero items dumps core */
707 if (st->nvariables <= 0)
711 var = (Variable *) bsearch((void *) &key,
712 (void *) st->variables,
722 /* check whether the name consists of alphabets, numerals and underscores. */
724 isLegalVariableName(const char *name)
728 for (i = 0; name[i] != '\0'; i++)
730 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
738 putVariable(CState *st, const char *context, char *name, char *value)
744 /* On some versions of Solaris, bsearch of zero items dumps core */
745 if (st->nvariables > 0)
746 var = (Variable *) bsearch((void *) &key,
747 (void *) st->variables,
759 * Check for the name only when declaring a new variable to avoid
762 if (!isLegalVariableName(name))
764 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
770 newvars = (Variable *) pg_realloc(st->variables,
771 (st->nvariables + 1) * sizeof(Variable));
773 newvars = (Variable *) pg_malloc(sizeof(Variable));
775 st->variables = newvars;
777 var = &newvars[st->nvariables];
779 var->name = pg_strdup(name);
780 var->value = pg_strdup(value);
784 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
791 /* dup then free, in case value is pointing at this variable */
792 val = pg_strdup(value);
802 parseVariable(const char *sql, int *eaten)
810 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
815 memcpy(name, &sql[1], i - 1);
823 replaceVariable(char **sql, char *param, int len, char *value)
825 int valueln = strlen(value);
829 size_t offset = param - *sql;
831 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
832 param = *sql + offset;
836 memmove(param + valueln, param + len, strlen(param + len) + 1);
837 memcpy(param, value, valueln);
839 return param + valueln;
843 assignVariables(CState *st, char *sql)
850 while ((p = strchr(p, ':')) != NULL)
854 name = parseVariable(p, &eaten);
864 val = getVariable(st, name);
872 p = replaceVariable(&sql, p, eaten, val);
879 getQueryParams(CState *st, const Command *command, const char **params)
883 for (i = 0; i < command->argc - 1; i++)
884 params[i] = getVariable(st, command->argv[i + 1]);
888 * Recursive evaluation of an expression in a pgbench script
889 * using the current state of variables.
890 * Returns whether the evaluation was ok,
891 * the value itself is returned through the retval pointer.
894 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
898 case ENODE_INTEGER_CONSTANT:
900 *retval = expr->u.integer_constant.ival;
908 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
910 fprintf(stderr, "undefined variable \"%s\"\n",
911 expr->u.variable.varname);
914 *retval = strtoint64(var);
923 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
925 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
927 switch (expr->u.operator.operator)
930 *retval = lval + rval;
934 *retval = lval - rval;
938 *retval = lval * rval;
944 fprintf(stderr, "division by zero\n");
947 *retval = lval / rval;
953 fprintf(stderr, "division by zero\n");
956 *retval = lval % rval;
960 fprintf(stderr, "bad operator\n");
968 fprintf(stderr, "bad expression\n");
973 * Run a shell command. The result is assigned to the variable if not NULL.
974 * Return true if succeeded, or false on error.
977 runShellCommand(CState *st, char *variable, char **argv, int argc)
979 char command[SHELL_COMMAND_SIZE];
988 * Join arguments with whitespace separators. Arguments starting with
989 * exactly one colon are treated as variables:
990 * name - append a string "name"
991 * :var - append a variable named 'var'
992 * ::name - append a string ":name"
995 for (i = 0; i < argc; i++)
1000 if (argv[i][0] != ':')
1002 arg = argv[i]; /* a string literal */
1004 else if (argv[i][1] == ':')
1006 arg = argv[i] + 1; /* a string literal starting with colons */
1008 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1010 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1015 arglen = strlen(arg);
1016 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1018 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1023 command[len++] = ' ';
1024 memcpy(command + len, arg, arglen);
1028 command[len] = '\0';
1030 /* Fast path for non-assignment case */
1031 if (variable == NULL)
1033 if (system(command))
1035 if (!timer_exceeded)
1036 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1042 /* Execute the command with pipe and read the standard output. */
1043 if ((fp = popen(command, "r")) == NULL)
1045 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1048 if (fgets(res, sizeof(res), fp) == NULL)
1050 if (!timer_exceeded)
1051 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1057 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1061 /* Check whether the result is an integer and assign it to the variable */
1062 retval = (int) strtol(res, &endptr, 10);
1063 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1065 if (*res == '\0' || *endptr != '\0')
1067 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1071 snprintf(res, sizeof(res), "%d", retval);
1072 if (!putVariable(st, "setshell", variable, res))
1076 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1081 #define MAX_PREPARE_NAME 32
1083 preparedStatementName(char *buffer, int file, int state)
1085 sprintf(buffer, "P%d_%d", file, state);
1089 clientDone(CState *st, bool ok)
1091 (void) ok; /* unused */
1093 if (st->con != NULL)
1098 return false; /* always false */
1102 agg_vals_init(AggVals *aggs, instr_time start)
1104 /* basic counters */
1105 aggs->cnt = 0; /* number of transactions (includes skipped) */
1106 aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
1108 aggs->sum_latency = 0; /* SUM(latency) */
1109 aggs->sum2_latency = 0; /* SUM(latency*latency) */
1111 /* min and max transaction duration */
1112 aggs->min_latency = 0;
1113 aggs->max_latency = 0;
1115 /* schedule lag counters */
1121 /* start of the current interval */
1122 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1125 /* return false iff client should be disconnected */
1127 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1131 bool trans_needs_throttle = false;
1135 * gettimeofday() isn't free, so we get the current timestamp lazily the
1136 * first time it's needed, and reuse the same value throughout this
1137 * function after that. This also ensures that e.g. the calculated latency
1138 * reported in the log file and in the totals are the same. Zero means
1139 * "not set yet". Reset "now" when we step to the next command with "goto
1143 INSTR_TIME_SET_ZERO(now);
1145 commands = sql_files[st->use_file];
1148 * Handle throttling once per transaction by sleeping. It is simpler to
1149 * do this here rather than at the end, because so much complicated logic
1150 * happens below when statements finish.
1152 if (throttle_delay && !st->is_throttled)
1155 * Generate a delay such that the series of delays will approximate a
1156 * Poisson distribution centered on the throttle_delay time.
1158 * If transactions are too slow or a given wait is shorter than a
1159 * transaction, the next transaction will start right away.
1161 int64 wait = getPoissonRand(thread, throttle_delay);
1163 thread->throttle_trigger += wait;
1164 st->txn_scheduled = thread->throttle_trigger;
1167 * If this --latency-limit is used, and this slot is already late so
1168 * that the transaction will miss the latency limit even if it
1169 * completed immediately, we skip this time slot and iterate till the
1170 * next slot that isn't late yet.
1176 if (INSTR_TIME_IS_ZERO(now))
1177 INSTR_TIME_SET_CURRENT(now);
1178 now_us = INSTR_TIME_GET_MICROSEC(now);
1179 while (thread->throttle_trigger < now_us - latency_limit)
1181 thread->throttle_latency_skipped++;
1184 doLog(thread, st, logfile, &now, agg, true);
1186 wait = getPoissonRand(thread, throttle_delay);
1187 thread->throttle_trigger += wait;
1188 st->txn_scheduled = thread->throttle_trigger;
1193 st->throttling = true;
1194 st->is_throttled = true;
1196 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1201 { /* are we sleeping? */
1204 if (INSTR_TIME_IS_ZERO(now))
1205 INSTR_TIME_SET_CURRENT(now);
1206 now_us = INSTR_TIME_GET_MICROSEC(now);
1207 if (st->txn_scheduled <= now_us)
1209 st->sleeping = 0; /* Done sleeping, go ahead with next command */
1212 /* Measure lag of throttled transaction relative to target */
1213 int64 lag = now_us - st->txn_scheduled;
1215 thread->throttle_lag += lag;
1216 if (lag > thread->throttle_lag_max)
1217 thread->throttle_lag_max = lag;
1218 st->throttling = false;
1222 return true; /* Still sleeping, nothing to do here */
1226 { /* are we receiver? */
1227 if (commands[st->state]->type == SQL_COMMAND)
1230 fprintf(stderr, "client %d receiving\n", st->id);
1231 if (!PQconsumeInput(st->con))
1232 { /* there's something wrong */
1233 fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1234 return clientDone(st, false);
1236 if (PQisBusy(st->con))
1237 return true; /* don't have the whole result yet */
1241 * command finished: accumulate per-command execution times in
1242 * thread-local data structure, if per-command latencies are requested
1246 int cnum = commands[st->state]->command_num;
1248 if (INSTR_TIME_IS_ZERO(now))
1249 INSTR_TIME_SET_CURRENT(now);
1250 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1251 now, st->stmt_begin);
1252 thread->exec_count[cnum]++;
1255 /* transaction finished: calculate latency and log the transaction */
1256 if (commands[st->state + 1] == NULL)
1258 /* only calculate latency if an option is used that needs it */
1259 if (progress || throttle_delay || latency_limit)
1263 if (INSTR_TIME_IS_ZERO(now))
1264 INSTR_TIME_SET_CURRENT(now);
1266 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1268 st->txn_latencies += latency;
1271 * XXX In a long benchmark run of high-latency transactions,
1272 * this int64 addition eventually overflows. For example, 100
1273 * threads running 10s transactions will overflow it in 2.56
1274 * hours. With a more-typical OLTP workload of .1s
1275 * transactions, overflow would take 256 hours.
1277 st->txn_sqlats += latency * latency;
1279 /* record over the limit transactions if needed. */
1280 if (latency_limit && latency > latency_limit)
1281 thread->latency_late++;
1284 /* record the time it took in the log */
1286 doLog(thread, st, logfile, &now, agg, false);
1289 if (commands[st->state]->type == SQL_COMMAND)
1292 * Read and discard the query result; note this is not included in
1293 * the statement latency numbers.
1295 res = PQgetResult(st->con);
1296 switch (PQresultStatus(res))
1298 case PGRES_COMMAND_OK:
1299 case PGRES_TUPLES_OK:
1302 fprintf(stderr, "client %d aborted in state %d: %s",
1303 st->id, st->state, PQerrorMessage(st->con));
1305 return clientDone(st, false);
1308 discard_response(st);
1311 if (commands[st->state + 1] == NULL)
1320 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1321 return clientDone(st, true); /* exit success */
1324 /* increment state counter */
1326 if (commands[st->state] == NULL)
1329 st->use_file = (int) getrand(thread, 0, num_files - 1);
1330 commands = sql_files[st->use_file];
1331 st->is_throttled = false;
1334 * No transaction is underway anymore, which means there is
1335 * nothing to listen to right now. When throttling rate limits
1336 * are active, a sleep will happen next, as the next transaction
1337 * starts. And then in any case the next SQL command will set
1341 trans_needs_throttle = (throttle_delay > 0);
1345 if (st->con == NULL)
1350 INSTR_TIME_SET_CURRENT(start);
1351 if ((st->con = doConnect()) == NULL)
1353 fprintf(stderr, "client %d aborted while establishing connection\n",
1355 return clientDone(st, false);
1357 INSTR_TIME_SET_CURRENT(end);
1358 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1362 * This ensures that a throttling delay is inserted before proceeding with
1363 * sql commands, after the first transaction. The first transaction
1364 * throttling is performed when first entering doCustom.
1366 if (trans_needs_throttle)
1368 trans_needs_throttle = false;
1372 /* Record transaction start time under logging, progress or throttling */
1373 if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1375 INSTR_TIME_SET_CURRENT(st->txn_begin);
1378 * When not throttling, this is also the transaction's scheduled start
1381 if (!throttle_delay)
1382 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1385 /* Record statement start time if per-command latencies are requested */
1387 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1389 if (commands[st->state]->type == SQL_COMMAND)
1391 const Command *command = commands[st->state];
1394 if (querymode == QUERY_SIMPLE)
1398 sql = pg_strdup(command->argv[0]);
1399 sql = assignVariables(st, sql);
1402 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1403 r = PQsendQuery(st->con, sql);
1406 else if (querymode == QUERY_EXTENDED)
1408 const char *sql = command->argv[0];
1409 const char *params[MAX_ARGS];
1411 getQueryParams(st, command, params);
1414 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1415 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1416 NULL, params, NULL, NULL, 0);
1418 else if (querymode == QUERY_PREPARED)
1420 char name[MAX_PREPARE_NAME];
1421 const char *params[MAX_ARGS];
1423 if (!st->prepared[st->use_file])
1427 for (j = 0; commands[j] != NULL; j++)
1430 char name[MAX_PREPARE_NAME];
1432 if (commands[j]->type != SQL_COMMAND)
1434 preparedStatementName(name, st->use_file, j);
1435 res = PQprepare(st->con, name,
1436 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1437 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1438 fprintf(stderr, "%s", PQerrorMessage(st->con));
1441 st->prepared[st->use_file] = true;
1444 getQueryParams(st, command, params);
1445 preparedStatementName(name, st->use_file, st->state);
1448 fprintf(stderr, "client %d sending %s\n", st->id, name);
1449 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1450 params, NULL, NULL, 0);
1452 else /* unknown sql mode */
1458 fprintf(stderr, "client %d could not send %s\n",
1459 st->id, command->argv[0]);
1463 st->listen = 1; /* flags that should be listened */
1465 else if (commands[st->state]->type == META_COMMAND)
1467 int argc = commands[st->state]->argc,
1469 char **argv = commands[st->state]->argv;
1473 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1474 for (i = 1; i < argc; i++)
1475 fprintf(stderr, " %s", argv[i]);
1476 fprintf(stderr, "\n");
1479 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1484 double threshold = 0;
1487 if (*argv[2] == ':')
1489 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1491 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1496 min = strtoint64(var);
1499 min = strtoint64(argv[2]);
1501 if (*argv[3] == ':')
1503 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1505 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1510 max = strtoint64(var);
1513 max = strtoint64(argv[3]);
1517 fprintf(stderr, "%s: \\setrandom maximum is less than minimum\n",
1524 * Generate random number functions need to be able to subtract
1525 * max from min and add one to the result without overflowing.
1526 * Since we know max > min, we can detect overflow just by
1527 * checking for a negative result. But we must check both that the
1528 * subtraction doesn't overflow, and that adding one to the result
1529 * doesn't overflow either.
1531 if (max - min < 0 || (max - min) + 1 < 0)
1533 fprintf(stderr, "%s: \\setrandom range is too large\n",
1539 if (argc == 4 || /* uniform without or with "uniform" keyword */
1540 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1543 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1545 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1547 else if (argc == 6 &&
1548 ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1549 (pg_strcasecmp(argv[4], "exponential") == 0)))
1551 if (*argv[5] == ':')
1553 if ((var = getVariable(st, argv[5] + 1)) == NULL)
1555 fprintf(stderr, "%s: invalid threshold number: \"%s\"\n",
1560 threshold = strtod(var, NULL);
1563 threshold = strtod(argv[5], NULL);
1565 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1567 if (threshold < MIN_GAUSSIAN_THRESHOLD)
1569 fprintf(stderr, "gaussian threshold must be at least %f (not \"%s\")\n", MIN_GAUSSIAN_THRESHOLD, argv[5]);
1574 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1576 snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1578 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1580 if (threshold <= 0.0)
1582 fprintf(stderr, "exponential threshold must be greater than zero (not \"%s\")\n", argv[5]);
1587 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1589 snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1592 else /* this means an error somewhere in the parsing phase... */
1594 fprintf(stderr, "%s: invalid arguments for \\setrandom\n",
1600 if (!putVariable(st, argv[0], argv[1], res))
1608 else if (pg_strcasecmp(argv[0], "set") == 0)
1611 PgBenchExpr *expr = commands[st->state]->expr;
1614 if (!evaluateExpr(st, expr, &result))
1619 sprintf(res, INT64_FORMAT, result);
1621 if (!putVariable(st, argv[0], argv[1], res))
1629 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1635 if (*argv[1] == ':')
1637 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1639 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1647 usec = atoi(argv[1]);
1651 if (pg_strcasecmp(argv[2], "ms") == 0)
1653 else if (pg_strcasecmp(argv[2], "s") == 0)
1659 INSTR_TIME_SET_CURRENT(now);
1660 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1665 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1667 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1669 if (timer_exceeded) /* timeout */
1670 return clientDone(st, true);
1671 else if (!ret) /* on error */
1676 else /* succeeded */
1679 else if (pg_strcasecmp(argv[0], "shell") == 0)
1681 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1683 if (timer_exceeded) /* timeout */
1684 return clientDone(st, true);
1685 else if (!ret) /* on error */
1690 else /* succeeded */
1700 * print log entry after completing one transaction.
1703 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1710 * Skip the log entry if sampling is enabled and this row doesn't belong
1711 * to the random sample.
1713 if (sample_rate != 0.0 &&
1714 pg_erand48(thread->random_state) > sample_rate)
1717 if (INSTR_TIME_IS_ZERO(*now))
1718 INSTR_TIME_SET_CURRENT(*now);
1720 latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1724 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1726 /* should we aggregate the results or not? */
1727 if (agg_interval > 0)
1730 * Are we still in the same interval? If yes, accumulate the values
1731 * (print them otherwise)
1733 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1739 * there is no latency to record if the transaction was
1746 agg->sum_latency += latency;
1747 agg->sum2_latency += latency * latency;
1749 /* first in this aggregation interval */
1750 if ((agg->cnt == 1) || (latency < agg->min_latency))
1751 agg->min_latency = latency;
1753 if ((agg->cnt == 1) || (latency > agg->max_latency))
1754 agg->max_latency = latency;
1756 /* and the same for schedule lag */
1759 agg->sum_lag += lag;
1760 agg->sum2_lag += lag * lag;
1762 if ((agg->cnt == 1) || (lag < agg->min_lag))
1764 if ((agg->cnt == 1) || (lag > agg->max_lag))
1772 * Loop until we reach the interval of the current transaction
1773 * (and print all the empty intervals in between).
1775 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1778 * This is a non-Windows branch (thanks to the ifdef in
1779 * usage), so we don't need to handle this in a special way
1782 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1791 fprintf(logfile, " %.0f %.0f %.0f %.0f",
1797 fprintf(logfile, " %d", agg->skipped);
1799 fputc('\n', logfile);
1801 /* move to the next inteval */
1802 agg->start_time = agg->start_time + agg_interval;
1804 /* reset for "no transaction" intervals */
1807 agg->min_latency = 0;
1808 agg->max_latency = 0;
1809 agg->sum_latency = 0;
1810 agg->sum2_latency = 0;
1817 /* reset the values to include only the current transaction. */
1819 agg->skipped = skipped ? 1 : 0;
1820 agg->min_latency = latency;
1821 agg->max_latency = latency;
1822 agg->sum_latency = skipped ? 0.0 : latency;
1823 agg->sum2_latency = skipped ? 0.0 : latency * latency;
1827 agg->sum2_lag = lag * lag;
1832 /* no, print raw transactions */
1835 /* This is more than we really ought to know about instr_time */
1837 fprintf(logfile, "%d %d skipped %d %ld %ld",
1838 st->id, st->cnt, st->use_file,
1839 (long) now->tv_sec, (long) now->tv_usec);
1841 fprintf(logfile, "%d %d %.0f %d %ld %ld",
1842 st->id, st->cnt, latency, st->use_file,
1843 (long) now->tv_sec, (long) now->tv_usec);
1846 /* On Windows, instr_time doesn't provide a timestamp anyway */
1848 fprintf(logfile, "%d %d skipped %d 0 0",
1849 st->id, st->cnt, st->use_file);
1851 fprintf(logfile, "%d %d %.0f %d 0 0",
1852 st->id, st->cnt, latency, st->use_file);
1855 fprintf(logfile, " %.0f", lag);
1856 fputc('\n', logfile);
1860 /* discard connections */
1862 disconnect_all(CState *state, int length)
1866 for (i = 0; i < length; i++)
1870 PQfinish(state[i].con);
1871 state[i].con = NULL;
1876 /* create tables and setup data */
1878 init(bool is_no_vacuum)
1881 * The scale factor at/beyond which 32-bit integers are insufficient for
1882 * storing TPC-B account IDs.
1884 * Although the actual threshold is 21474, we use 20000 because it is easier to
1885 * document and remember, and isn't that far away from the real threshold.
1887 #define SCALE_32BIT_THRESHOLD 20000
1890 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1891 * fields in these table declarations were intended to comply with that.
1892 * The pgbench_accounts table complies with that because the "filler"
1893 * column is set to blank-padded empty string. But for all other tables
1894 * the columns default to NULL and so don't actually take any space. We
1895 * could fix that by giving them non-null default values. However, that
1896 * would completely break comparability of pgbench results with prior
1897 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1898 * anyway, we stick with the historical behavior.
1902 const char *table; /* table name */
1903 const char *smcols; /* column decls if accountIDs are 32 bits */
1904 const char *bigcols; /* column decls if accountIDs are 64 bits */
1905 int declare_fillfactor;
1907 static const struct ddlinfo DDLs[] = {
1910 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1911 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1916 "tid int not null,bid int,tbalance int,filler char(84)",
1917 "tid int not null,bid int,tbalance int,filler char(84)",
1922 "aid int not null,bid int,abalance int,filler char(84)",
1923 "aid bigint not null,bid int,abalance int,filler char(84)",
1928 "bid int not null,bbalance int,filler char(88)",
1929 "bid int not null,bbalance int,filler char(88)",
1933 static const char *const DDLINDEXes[] = {
1934 "alter table pgbench_branches add primary key (bid)",
1935 "alter table pgbench_tellers add primary key (tid)",
1936 "alter table pgbench_accounts add primary key (aid)"
1938 static const char *const DDLKEYs[] = {
1939 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1940 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1941 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1942 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1943 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1952 /* used to track elapsed time and estimate of the remaining time */
1957 int log_interval = 1;
1959 if ((con = doConnect()) == NULL)
1962 for (i = 0; i < lengthof(DDLs); i++)
1966 const struct ddlinfo *ddl = &DDLs[i];
1969 /* Remove old table, if it exists. */
1970 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1971 executeStatement(con, buffer);
1973 /* Construct new create table statement. */
1975 if (ddl->declare_fillfactor)
1976 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1977 " with (fillfactor=%d)", fillfactor);
1978 if (tablespace != NULL)
1980 char *escape_tablespace;
1982 escape_tablespace = PQescapeIdentifier(con, tablespace,
1983 strlen(tablespace));
1984 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1985 " tablespace %s", escape_tablespace);
1986 PQfreemem(escape_tablespace);
1989 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1991 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1992 unlogged_tables ? " unlogged" : "",
1993 ddl->table, cols, opts);
1995 executeStatement(con, buffer);
1998 executeStatement(con, "begin");
2000 for (i = 0; i < nbranches * scale; i++)
2002 /* "filler" column defaults to NULL */
2003 snprintf(sql, sizeof(sql),
2004 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2006 executeStatement(con, sql);
2009 for (i = 0; i < ntellers * scale; i++)
2011 /* "filler" column defaults to NULL */
2012 snprintf(sql, sizeof(sql),
2013 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2014 i + 1, i / ntellers + 1);
2015 executeStatement(con, sql);
2018 executeStatement(con, "commit");
2021 * fill the pgbench_accounts table with some data
2023 fprintf(stderr, "creating tables...\n");
2025 executeStatement(con, "begin");
2026 executeStatement(con, "truncate pgbench_accounts");
2028 res = PQexec(con, "copy pgbench_accounts from stdin");
2029 if (PQresultStatus(res) != PGRES_COPY_IN)
2031 fprintf(stderr, "%s", PQerrorMessage(con));
2036 INSTR_TIME_SET_CURRENT(start);
2038 for (k = 0; k < (int64) naccounts * scale; k++)
2042 /* "filler" column defaults to blank padded empty string */
2043 snprintf(sql, sizeof(sql),
2044 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2045 j, k / naccounts + 1, 0);
2046 if (PQputline(con, sql))
2048 fprintf(stderr, "PQputline failed\n");
2053 * If we want to stick with the original logging, print a message each
2054 * 100k inserted rows.
2056 if ((!use_quiet) && (j % 100000 == 0))
2058 INSTR_TIME_SET_CURRENT(diff);
2059 INSTR_TIME_SUBTRACT(diff, start);
2061 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2062 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2064 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2065 j, (int64) naccounts * scale,
2066 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2067 elapsed_sec, remaining_sec);
2069 /* let's not call the timing for each row, but only each 100 rows */
2070 else if (use_quiet && (j % 100 == 0))
2072 INSTR_TIME_SET_CURRENT(diff);
2073 INSTR_TIME_SUBTRACT(diff, start);
2075 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2076 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2078 /* have we reached the next interval (or end)? */
2079 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2081 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2082 j, (int64) naccounts * scale,
2083 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2085 /* skip to the next interval */
2086 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2091 if (PQputline(con, "\\.\n"))
2093 fprintf(stderr, "very last PQputline failed\n");
2098 fprintf(stderr, "PQendcopy failed\n");
2101 executeStatement(con, "commit");
2106 fprintf(stderr, "vacuum...\n");
2107 executeStatement(con, "vacuum analyze pgbench_branches");
2108 executeStatement(con, "vacuum analyze pgbench_tellers");
2109 executeStatement(con, "vacuum analyze pgbench_accounts");
2110 executeStatement(con, "vacuum analyze pgbench_history");
2116 fprintf(stderr, "set primary keys...\n");
2117 for (i = 0; i < lengthof(DDLINDEXes); i++)
2121 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2123 if (index_tablespace != NULL)
2125 char *escape_tablespace;
2127 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2128 strlen(index_tablespace));
2129 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2130 " using index tablespace %s", escape_tablespace);
2131 PQfreemem(escape_tablespace);
2134 executeStatement(con, buffer);
2138 * create foreign keys
2142 fprintf(stderr, "set foreign keys...\n");
2143 for (i = 0; i < lengthof(DDLKEYs); i++)
2145 executeStatement(con, DDLKEYs[i]);
2149 fprintf(stderr, "done.\n");
2154 * Parse the raw sql and replace :param to $n.
2157 parseQuery(Command *cmd, const char *raw_sql)
2162 sql = pg_strdup(raw_sql);
2166 while ((p = strchr(p, ':')) != NULL)
2172 name = parseVariable(p, &eaten);
2182 if (cmd->argc >= MAX_ARGS)
2184 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2189 sprintf(var, "$%d", cmd->argc);
2190 p = replaceVariable(&sql, p, eaten, var);
2192 cmd->argv[cmd->argc] = name;
2200 void pg_attribute_noreturn()
2201 syntax_error(const char *source, const int lineno,
2202 const char *line, const char *command,
2203 const char *msg, const char *more, const int column)
2205 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2207 fprintf(stderr, " (%s)", more);
2209 fprintf(stderr, " at column %d", column);
2210 fprintf(stderr, " in command \"%s\"\n", command);
2213 fprintf(stderr, "%s\n", line);
2218 for (i = 0; i < column - 1; i++)
2219 fprintf(stderr, " ");
2220 fprintf(stderr, "^ error found here\n");
2226 /* Parse a command; return a Command struct, or NULL if it's a comment */
2228 process_commands(char *buf, const char *source, const int lineno)
2230 const char delim[] = " \f\n\r\t\v";
2232 Command *my_commands;
2237 /* Make the string buf end at the next newline */
2238 if ((p = strchr(buf, '\n')) != NULL)
2241 /* Skip leading whitespace */
2243 while (isspace((unsigned char) *p))
2246 /* If the line is empty or actually a comment, we're done */
2247 if (*p == '\0' || strncmp(p, "--", 2) == 0)
2250 /* Allocate and initialize Command structure */
2251 my_commands = (Command *) pg_malloc(sizeof(Command));
2252 my_commands->line = pg_strdup(buf);
2253 my_commands->command_num = num_commands++;
2254 my_commands->type = 0; /* until set */
2255 my_commands->argc = 0;
2261 my_commands->type = META_COMMAND;
2264 tok = strtok(++p, delim);
2266 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2271 my_commands->cols[j] = tok - buf + 1;
2272 my_commands->argv[j++] = pg_strdup(tok);
2273 my_commands->argc++;
2274 if (max_args >= 0 && my_commands->argc >= max_args)
2275 tok = strtok(NULL, "");
2277 tok = strtok(NULL, delim);
2280 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2283 * parsing: \setrandom variable min max [uniform] \setrandom
2284 * variable min max (gaussian|exponential) threshold
2287 if (my_commands->argc < 4)
2289 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2290 "missing arguments", NULL, -1);
2295 if (my_commands->argc == 4 || /* uniform without/with
2296 * "uniform" keyword */
2297 (my_commands->argc == 5 &&
2298 pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2302 else if ( /* argc >= 5 */
2303 (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2304 (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2306 if (my_commands->argc < 6)
2308 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2309 "missing threshold argument", my_commands->argv[4], -1);
2311 else if (my_commands->argc > 6)
2313 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2314 "too many arguments", my_commands->argv[4],
2315 my_commands->cols[6]);
2318 else /* cannot parse, unexpected arguments */
2320 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2321 "unexpected argument", my_commands->argv[4],
2322 my_commands->cols[4]);
2325 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2327 if (my_commands->argc < 3)
2329 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2330 "missing argument", NULL, -1);
2333 expr_scanner_init(my_commands->argv[2], source, lineno,
2334 my_commands->line, my_commands->argv[0],
2335 my_commands->cols[2] - 1);
2337 if (expr_yyparse() != 0)
2339 /* dead code: exit done from syntax_error called by yyerror */
2343 my_commands->expr = expr_parse_result;
2345 expr_scanner_finish();
2347 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2349 if (my_commands->argc < 2)
2351 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2352 "missing argument", NULL, -1);
2356 * Split argument into number and unit to allow "sleep 1ms" etc.
2357 * We don't have to terminate the number argument with null
2358 * because it will be parsed with atoi, which ignores trailing
2359 * non-digit characters.
2361 if (my_commands->argv[1][0] != ':')
2363 char *c = my_commands->argv[1];
2365 while (isdigit((unsigned char) *c))
2369 my_commands->argv[2] = c;
2370 if (my_commands->argc < 3)
2371 my_commands->argc = 3;
2375 if (my_commands->argc >= 3)
2377 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2378 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2379 pg_strcasecmp(my_commands->argv[2], "s") != 0)
2381 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2382 "unknown time unit, must be us, ms or s",
2383 my_commands->argv[2], my_commands->cols[2]);
2387 /* this should be an error?! */
2388 for (j = 3; j < my_commands->argc; j++)
2389 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2390 my_commands->argv[0], my_commands->argv[j]);
2392 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2394 if (my_commands->argc < 3)
2396 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2397 "missing argument", NULL, -1);
2400 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2402 if (my_commands->argc < 1)
2404 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2405 "missing command", NULL, -1);
2410 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2411 "invalid command", NULL, -1);
2416 my_commands->type = SQL_COMMAND;
2421 my_commands->argv[0] = pg_strdup(p);
2422 my_commands->argc++;
2424 case QUERY_EXTENDED:
2425 case QUERY_PREPARED:
2426 if (!parseQuery(my_commands, p))
2438 * Read a line from fd, and return it in a malloc'd buffer.
2439 * Return NULL at EOF.
2441 * The buffer will typically be larger than necessary, but we don't care
2442 * in this program, because we'll free it as soon as we've parsed the line.
2445 read_line_from_file(FILE *fd)
2447 char tmpbuf[BUFSIZ];
2449 size_t buflen = BUFSIZ;
2452 buf = (char *) palloc(buflen);
2455 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2457 size_t thislen = strlen(tmpbuf);
2459 /* Append tmpbuf to whatever we had already */
2460 memcpy(buf + used, tmpbuf, thislen + 1);
2463 /* Done if we collected a newline */
2464 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2467 /* Else, enlarge buf to ensure we can append next bufferload */
2469 buf = (char *) pg_realloc(buf, buflen);
2481 process_file(char *filename)
2483 #define COMMANDS_ALLOC_NUM 128
2485 Command **my_commands;
2492 if (num_files >= MAX_FILES)
2494 fprintf(stderr, "at most %d SQL files are allowed\n", MAX_FILES);
2498 alloc_num = COMMANDS_ALLOC_NUM;
2499 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2501 if (strcmp(filename, "-") == 0)
2503 else if ((fd = fopen(filename, "r")) == NULL)
2505 fprintf(stderr, "could not open file \"%s\": %s\n",
2506 filename, strerror(errno));
2507 pg_free(my_commands);
2514 while ((buf = read_line_from_file(fd)) != NULL)
2520 command = process_commands(buf, filename, lineno);
2524 if (command == NULL)
2527 my_commands[index] = command;
2530 if (index >= alloc_num)
2532 alloc_num += COMMANDS_ALLOC_NUM;
2533 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2538 my_commands[index] = NULL;
2540 sql_files[num_files++] = my_commands;
2546 process_builtin(char *tb, const char *source)
2548 #define COMMANDS_ALLOC_NUM 128
2550 Command **my_commands;
2556 alloc_num = COMMANDS_ALLOC_NUM;
2557 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2568 while (*tb && *tb != '\n')
2581 command = process_commands(buf, source, lineno);
2582 if (command == NULL)
2585 my_commands[index] = command;
2588 if (index >= alloc_num)
2590 alloc_num += COMMANDS_ALLOC_NUM;
2591 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2595 my_commands[index] = NULL;
2600 /* print out results */
2602 printResults(int ttype, int64 normal_xacts, int nclients,
2603 TState *threads, int nthreads,
2604 instr_time total_time, instr_time conn_total_time,
2605 int64 total_latencies, int64 total_sqlats,
2606 int64 throttle_lag, int64 throttle_lag_max,
2607 int64 throttle_latency_skipped, int64 latency_late)
2609 double time_include,
2614 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2615 tps_include = normal_xacts / time_include;
2616 tps_exclude = normal_xacts / (time_include -
2617 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2620 s = "TPC-B (sort of)";
2621 else if (ttype == 2)
2622 s = "Update only pgbench_accounts";
2623 else if (ttype == 1)
2628 printf("transaction type: %s\n", s);
2629 printf("scaling factor: %d\n", scale);
2630 printf("query mode: %s\n", QUERYMODE[querymode]);
2631 printf("number of clients: %d\n", nclients);
2632 printf("number of threads: %d\n", nthreads);
2635 printf("number of transactions per client: %d\n", nxacts);
2636 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2637 normal_xacts, (int64) nxacts * nclients);
2641 printf("duration: %d s\n", duration);
2642 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2646 /* Remaining stats are nonsensical if we failed to execute any xacts */
2647 if (normal_xacts <= 0)
2650 if (throttle_delay && latency_limit)
2651 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2652 throttle_latency_skipped,
2653 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2656 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2657 latency_limit / 1000.0, latency_late,
2658 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2660 if (throttle_delay || progress || latency_limit)
2662 /* compute and show latency average and standard deviation */
2663 double latency = 0.001 * total_latencies / normal_xacts;
2664 double sqlat = (double) total_sqlats / normal_xacts;
2666 printf("latency average: %.3f ms\n"
2667 "latency stddev: %.3f ms\n",
2668 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2672 /* only an average latency computed from the duration is available */
2673 printf("latency average: %.3f ms\n",
2674 1000.0 * duration * nclients / normal_xacts);
2680 * Report average transaction lag under rate limit throttling. This
2681 * is the delay between scheduled and actual start times for the
2682 * transaction. The measured lag may be caused by thread/client load,
2683 * the database load, or the Poisson throttling process.
2685 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2686 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2689 printf("tps = %f (including connections establishing)\n", tps_include);
2690 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2692 /* Report per-command latencies */
2697 for (i = 0; i < num_files; i++)
2702 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2704 printf("statement latencies in milliseconds:\n");
2706 for (commands = sql_files[i]; *commands != NULL; commands++)
2708 Command *command = *commands;
2709 int cnum = command->command_num;
2711 instr_time total_exec_elapsed;
2712 int total_exec_count;
2715 /* Accumulate per-thread data for command */
2716 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2717 total_exec_count = 0;
2718 for (t = 0; t < nthreads; t++)
2720 TState *thread = &threads[t];
2722 INSTR_TIME_ADD(total_exec_elapsed,
2723 thread->exec_elapsed[cnum]);
2724 total_exec_count += thread->exec_count[cnum];
2727 if (total_exec_count > 0)
2728 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2732 printf("\t%f\t%s\n", total_time, command->line);
2740 main(int argc, char **argv)
2742 static struct option long_options[] = {
2743 /* systematic long/short named options */
2744 {"client", required_argument, NULL, 'c'},
2745 {"connect", no_argument, NULL, 'C'},
2746 {"debug", no_argument, NULL, 'd'},
2747 {"define", required_argument, NULL, 'D'},
2748 {"file", required_argument, NULL, 'f'},
2749 {"fillfactor", required_argument, NULL, 'F'},
2750 {"host", required_argument, NULL, 'h'},
2751 {"initialize", no_argument, NULL, 'i'},
2752 {"jobs", required_argument, NULL, 'j'},
2753 {"log", no_argument, NULL, 'l'},
2754 {"no-vacuum", no_argument, NULL, 'n'},
2755 {"port", required_argument, NULL, 'p'},
2756 {"progress", required_argument, NULL, 'P'},
2757 {"protocol", required_argument, NULL, 'M'},
2758 {"quiet", no_argument, NULL, 'q'},
2759 {"report-latencies", no_argument, NULL, 'r'},
2760 {"scale", required_argument, NULL, 's'},
2761 {"select-only", no_argument, NULL, 'S'},
2762 {"skip-some-updates", no_argument, NULL, 'N'},
2763 {"time", required_argument, NULL, 'T'},
2764 {"transactions", required_argument, NULL, 't'},
2765 {"username", required_argument, NULL, 'U'},
2766 {"vacuum-all", no_argument, NULL, 'v'},
2767 /* long-named only options */
2768 {"foreign-keys", no_argument, &foreign_keys, 1},
2769 {"index-tablespace", required_argument, NULL, 3},
2770 {"tablespace", required_argument, NULL, 2},
2771 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2772 {"sampling-rate", required_argument, NULL, 4},
2773 {"aggregate-interval", required_argument, NULL, 5},
2774 {"rate", required_argument, NULL, 'R'},
2775 {"latency-limit", required_argument, NULL, 'L'},
2780 int nclients = 1; /* default number of simulated clients */
2781 int nthreads = 1; /* default number of threads */
2782 int is_init_mode = 0; /* initialize mode? */
2783 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2784 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2785 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2786 * 2: skip update of branches and tellers */
2788 char *filename = NULL;
2789 bool scale_given = false;
2791 bool benchmarking_option_set = false;
2792 bool initialization_option_set = false;
2794 CState *state; /* status of clients */
2795 TState *threads; /* array of thread */
2797 instr_time start_time; /* start up time */
2798 instr_time total_time;
2799 instr_time conn_total_time;
2800 int64 total_xacts = 0;
2801 int64 total_latencies = 0;
2802 int64 total_sqlats = 0;
2803 int64 throttle_lag = 0;
2804 int64 throttle_lag_max = 0;
2805 int64 throttle_latency_skipped = 0;
2806 int64 latency_late = 0;
2811 #ifdef HAVE_GETRLIMIT
2821 progname = get_progname(argv[0]);
2825 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2830 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2832 puts("pgbench (PostgreSQL) " PG_VERSION);
2838 /* stderr is buffered on Win32. */
2839 setvbuf(stderr, NULL, _IONBF, 0);
2842 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2844 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2846 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2849 state = (CState *) pg_malloc(sizeof(CState));
2850 memset(state, 0, sizeof(CState));
2852 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2860 pghost = pg_strdup(optarg);
2866 do_vacuum_accounts++;
2869 pgport = pg_strdup(optarg);
2876 benchmarking_option_set = true;
2880 benchmarking_option_set = true;
2883 benchmarking_option_set = true;
2884 nclients = atoi(optarg);
2885 if (nclients <= 0 || nclients > MAXCLIENTS)
2887 fprintf(stderr, "invalid number of clients: \"%s\"\n",
2891 #ifdef HAVE_GETRLIMIT
2892 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2893 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2894 #else /* but BSD doesn't ... */
2895 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2896 #endif /* RLIMIT_NOFILE */
2898 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2901 if (rlim.rlim_cur < nclients + 3)
2903 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
2904 nclients + 3, (long) rlim.rlim_cur);
2905 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
2908 #endif /* HAVE_GETRLIMIT */
2910 case 'j': /* jobs */
2911 benchmarking_option_set = true;
2912 nthreads = atoi(optarg);
2915 fprintf(stderr, "invalid number of threads: \"%s\"\n",
2919 #ifndef ENABLE_THREAD_SAFETY
2922 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
2925 #endif /* !ENABLE_THREAD_SAFETY */
2928 benchmarking_option_set = true;
2932 benchmarking_option_set = true;
2933 is_latencies = true;
2937 scale = atoi(optarg);
2940 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
2945 benchmarking_option_set = true;
2948 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2951 nxacts = atoi(optarg);
2954 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
2960 benchmarking_option_set = true;
2963 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2966 duration = atoi(optarg);
2969 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
2974 login = pg_strdup(optarg);
2977 benchmarking_option_set = true;
2981 initialization_option_set = true;
2985 benchmarking_option_set = true;
2987 filename = pg_strdup(optarg);
2988 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2995 benchmarking_option_set = true;
2997 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2999 fprintf(stderr, "invalid variable definition: \"%s\"\n",
3005 if (!putVariable(&state[0], "option", optarg, p))
3010 initialization_option_set = true;
3011 fillfactor = atoi(optarg);
3012 if (fillfactor < 10 || fillfactor > 100)
3014 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3019 benchmarking_option_set = true;
3022 fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f)\n");
3025 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3026 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3028 if (querymode >= NUM_QUERYMODE)
3030 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3036 benchmarking_option_set = true;
3037 progress = atoi(optarg);
3040 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3047 /* get a double from the beginning of option value */
3048 double throttle_value = atof(optarg);
3050 benchmarking_option_set = true;
3052 if (throttle_value <= 0.0)
3054 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3057 /* Invert rate limit into a time offset */
3058 throttle_delay = (int64) (1000000.0 / throttle_value);
3063 double limit_ms = atof(optarg);
3065 if (limit_ms <= 0.0)
3067 fprintf(stderr, "invalid latency limit: \"%s\"\n",
3071 benchmarking_option_set = true;
3072 latency_limit = (int64) (limit_ms * 1000);
3076 /* This covers long options which take no argument. */
3077 if (foreign_keys || unlogged_tables)
3078 initialization_option_set = true;
3080 case 2: /* tablespace */
3081 initialization_option_set = true;
3082 tablespace = pg_strdup(optarg);
3084 case 3: /* index-tablespace */
3085 initialization_option_set = true;
3086 index_tablespace = pg_strdup(optarg);
3089 benchmarking_option_set = true;
3090 sample_rate = atof(optarg);
3091 if (sample_rate <= 0.0 || sample_rate > 1.0)
3093 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3099 fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3102 benchmarking_option_set = true;
3103 agg_interval = atoi(optarg);
3104 if (agg_interval <= 0)
3106 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3113 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3120 * Don't need more threads than there are clients. (This is not merely an
3121 * optimization; throttle_delay is calculated incorrectly below if some
3122 * threads have no clients assigned to them.)
3124 if (nthreads > nclients)
3125 nthreads = nclients;
3127 /* compute a per thread delay */
3128 throttle_delay *= nthreads;
3131 dbName = argv[optind];
3134 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3136 else if (login != NULL && *login != '\0')
3144 if (benchmarking_option_set)
3146 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3155 if (initialization_option_set)
3157 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3162 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3163 if (nxacts <= 0 && duration <= 0)
3164 nxacts = DEFAULT_NXACTS;
3166 /* --sampling-rate may be used only with -l */
3167 if (sample_rate > 0.0 && !use_log)
3169 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3173 /* --sampling-rate may must not be used with --aggregate-interval */
3174 if (sample_rate > 0.0 && agg_interval > 0)
3176 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3180 if (agg_interval > 0 && !use_log)
3182 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3186 if (duration > 0 && agg_interval > duration)
3188 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3192 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3194 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3199 * save main process id in the global variable because process id will be
3200 * changed after fork.
3202 main_pid = (int) getpid();
3203 progress_nclients = nclients;
3204 progress_nthreads = nthreads;
3208 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3209 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3211 /* copy any -D switch values to all clients */
3212 for (i = 1; i < nclients; i++)
3217 for (j = 0; j < state[0].nvariables; j++)
3219 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3228 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3229 pghost, pgport, nclients, nxacts, dbName);
3231 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3232 pghost, pgport, nclients, duration, dbName);
3235 /* opening connection... */
3240 if (PQstatus(con) == CONNECTION_BAD)
3242 fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
3243 fprintf(stderr, "%s", PQerrorMessage(con));
3250 * get the scaling factor that should be same as count(*) from
3251 * pgbench_branches if this is not a custom query
3253 res = PQexec(con, "select count(*) from pgbench_branches");
3254 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3256 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
3258 fprintf(stderr, "%s", PQerrorMessage(con));
3259 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
3261 fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
3266 scale = atoi(PQgetvalue(res, 0, 0));
3269 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3270 PQgetvalue(res, 0, 0));
3275 /* warn if we override user-given -s switch */
3278 "scale option ignored, using count from pgbench_branches table (%d)\n",
3283 * :scale variables normally get -s or database scale, but don't override
3284 * an explicit -D switch
3286 if (getVariable(&state[0], "scale") == NULL)
3288 snprintf(val, sizeof(val), "%d", scale);
3289 for (i = 0; i < nclients; i++)
3291 if (!putVariable(&state[i], "startup", "scale", val))
3297 * Define a :client_id variable that is unique per connection. But don't
3298 * override an explicit -D switch.
3300 if (getVariable(&state[0], "client_id") == NULL)
3302 for (i = 0; i < nclients; i++)
3304 snprintf(val, sizeof(val), "%d", i);
3305 if (!putVariable(&state[i], "startup", "client_id", val))
3312 fprintf(stderr, "starting vacuum...");
3313 tryExecuteStatement(con, "vacuum pgbench_branches");
3314 tryExecuteStatement(con, "vacuum pgbench_tellers");
3315 tryExecuteStatement(con, "truncate pgbench_history");
3316 fprintf(stderr, "end.\n");
3318 if (do_vacuum_accounts)
3320 fprintf(stderr, "starting vacuum pgbench_accounts...");
3321 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3322 fprintf(stderr, "end.\n");
3327 /* set random seed */
3328 INSTR_TIME_SET_CURRENT(start_time);
3329 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3331 /* process builtin SQL scripts */
3335 sql_files[0] = process_builtin(tpc_b,
3336 "<builtin: TPC-B (sort of)>");
3341 sql_files[0] = process_builtin(select_only,
3342 "<builtin: select only>");
3347 sql_files[0] = process_builtin(simple_update,
3348 "<builtin: simple update>");
3356 /* set up thread data structures */
3357 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3360 for (i = 0; i < nthreads; i++)
3362 TState *thread = &threads[i];
3365 thread->state = &state[nclients_dealt];
3367 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
3368 thread->random_state[0] = random();
3369 thread->random_state[1] = random();
3370 thread->random_state[2] = random();
3371 thread->throttle_latency_skipped = 0;
3372 thread->latency_late = 0;
3374 nclients_dealt += thread->nstate;
3378 /* Reserve memory for the thread to store per-command latencies */
3381 thread->exec_elapsed = (instr_time *)
3382 pg_malloc(sizeof(instr_time) * num_commands);
3383 thread->exec_count = (int *)
3384 pg_malloc(sizeof(int) * num_commands);
3386 for (t = 0; t < num_commands; t++)
3388 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3389 thread->exec_count[t] = 0;
3394 thread->exec_elapsed = NULL;
3395 thread->exec_count = NULL;
3399 /* all clients must be assigned to a thread */
3400 Assert(nclients_dealt == nclients);
3402 /* get start up time */
3403 INSTR_TIME_SET_CURRENT(start_time);
3405 /* set alarm if duration is specified. */
3410 #ifdef ENABLE_THREAD_SAFETY
3411 for (i = 0; i < nthreads; i++)
3413 TState *thread = &threads[i];
3415 INSTR_TIME_SET_CURRENT(thread->start_time);
3417 /* the first thread (i = 0) is executed by main thread */
3420 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
3422 if (err != 0 || thread->thread == INVALID_THREAD)
3424 fprintf(stderr, "could not create thread: %s\n", strerror(err));
3430 thread->thread = INVALID_THREAD;
3434 INSTR_TIME_SET_CURRENT(threads[0].start_time);
3435 threads[0].thread = INVALID_THREAD;
3436 #endif /* ENABLE_THREAD_SAFETY */
3438 /* wait for threads and accumulate results */
3439 INSTR_TIME_SET_ZERO(conn_total_time);
3440 for (i = 0; i < nthreads; i++)
3442 TState *thread = &threads[i];
3445 #ifdef ENABLE_THREAD_SAFETY
3446 if (threads[i].thread == INVALID_THREAD)
3447 /* actually run this thread directly in the main thread */
3448 (void) threadRun(thread);
3450 /* wait of other threads. should check that 0 is returned? */
3451 pthread_join(thread->thread, NULL);
3453 (void) threadRun(thread);
3454 #endif /* ENABLE_THREAD_SAFETY */
3456 /* thread level stats */
3457 throttle_lag += thread->throttle_lag;
3458 throttle_latency_skipped += threads->throttle_latency_skipped;
3459 latency_late += thread->latency_late;
3460 if (throttle_lag_max > thread->throttle_lag_max)
3461 throttle_lag_max = thread->throttle_lag_max;
3462 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
3464 /* client-level stats */
3465 for (j = 0; j < thread->nstate; j++)
3467 total_xacts += thread->state[j].cnt;
3468 total_latencies += thread->state[j].txn_latencies;
3469 total_sqlats += thread->state[j].txn_sqlats;
3472 disconnect_all(state, nclients);
3475 * XXX We compute results as though every client of every thread started
3476 * and finished at the same time. That model can diverge noticeably from
3477 * reality for a short benchmark run involving relatively many threads.
3478 * The first thread may process notably many transactions before the last
3479 * thread begins. Improving the model alone would bring limited benefit,
3480 * because performance during those periods of partial thread count can
3481 * easily exceed steady state performance. This is one of the many ways
3482 * short runs convey deceptive performance figures.
3484 INSTR_TIME_SET_CURRENT(total_time);
3485 INSTR_TIME_SUBTRACT(total_time, start_time);
3486 printResults(ttype, total_xacts, nclients, threads, nthreads,
3487 total_time, conn_total_time, total_latencies, total_sqlats,
3488 throttle_lag, throttle_lag_max, throttle_latency_skipped,
3495 threadRun(void *arg)
3497 TState *thread = (TState *) arg;
3498 CState *state = thread->state;
3499 FILE *logfile = NULL; /* per-thread log file */
3502 int nstate = thread->nstate;
3503 int remains = nstate; /* number of remaining clients */
3506 /* for reporting progress: */
3507 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3508 int64 last_report = thread_start;
3509 int64 next_report = last_report + (int64) progress * 1000000;
3510 int64 last_count = 0,
3519 * Initialize throttling rate target for all of the thread's clients. It
3520 * might be a little more accurate to reset thread->start_time here too.
3521 * The possible drift seems too small relative to typical throttle delay
3522 * times to worry about it.
3524 INSTR_TIME_SET_CURRENT(start);
3525 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3526 thread->throttle_lag = 0;
3527 thread->throttle_lag_max = 0;
3529 INSTR_TIME_SET_ZERO(thread->conn_time);
3531 /* open log file if requested */
3536 if (thread->tid == 0)
3537 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3539 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3540 logfile = fopen(logpath, "w");
3542 if (logfile == NULL)
3544 fprintf(stderr, "could not open logfile \"%s\": %s\n",
3545 logpath, strerror(errno));
3552 /* make connections to the database */
3553 for (i = 0; i < nstate; i++)
3555 if ((state[i].con = doConnect()) == NULL)
3560 /* time after thread and connections set up */
3561 INSTR_TIME_SET_CURRENT(thread->conn_time);
3562 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
3564 agg_vals_init(&aggs, thread->start_time);
3566 /* send start up queries in async manner */
3567 for (i = 0; i < nstate; i++)
3569 CState *st = &state[i];
3570 Command **commands = sql_files[st->use_file];
3571 int prev_ecnt = st->ecnt;
3573 st->use_file = getrand(thread, 0, num_files - 1);
3574 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3575 remains--; /* I've aborted */
3577 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3579 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3581 remains--; /* I've aborted */
3590 int maxsock; /* max socket number to be waited */
3594 FD_ZERO(&input_mask);
3597 min_usec = PG_INT64_MAX;
3598 for (i = 0; i < nstate; i++)
3600 CState *st = &state[i];
3601 Command **commands = sql_files[st->use_file];
3604 if (st->con == NULL)
3608 else if (st->sleeping)
3610 if (st->throttling && timer_exceeded)
3612 /* interrupt client which has not started a transaction */
3615 st->throttling = false;
3620 else /* just a nap from the script */
3624 if (min_usec == PG_INT64_MAX)
3628 INSTR_TIME_SET_CURRENT(now);
3629 now_usec = INSTR_TIME_GET_MICROSEC(now);
3632 this_usec = st->txn_scheduled - now_usec;
3633 if (min_usec > this_usec)
3634 min_usec = this_usec;
3637 else if (commands[st->state]->type == META_COMMAND)
3639 min_usec = 0; /* the connection is ready to run */
3643 sock = PQsocket(st->con);
3646 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3650 FD_SET(sock, &input_mask);
3656 /* also wake up to print the next progress report on time */
3657 if (progress && min_usec > 0 && thread->tid == 0)
3659 /* get current time if needed */
3664 INSTR_TIME_SET_CURRENT(now);
3665 now_usec = INSTR_TIME_GET_MICROSEC(now);
3668 if (now_usec >= next_report)
3670 else if ((next_report - now_usec) < min_usec)
3671 min_usec = next_report - now_usec;
3675 * Sleep until we receive data from the server, or a nap-time
3676 * specified in the script ends, or it's time to print a progress
3679 if (min_usec > 0 && maxsock != -1)
3681 int nsocks; /* return from select(2) */
3683 if (min_usec != PG_INT64_MAX)
3685 struct timeval timeout;
3687 timeout.tv_sec = min_usec / 1000000;
3688 timeout.tv_usec = min_usec % 1000000;
3689 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3692 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3697 /* must be something wrong */
3698 fprintf(stderr, "select() failed: %s\n", strerror(errno));
3703 /* ok, backend returns reply */
3704 for (i = 0; i < nstate; i++)
3706 CState *st = &state[i];
3707 Command **commands = sql_files[st->use_file];
3708 int prev_ecnt = st->ecnt;
3710 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3711 || commands[st->state]->type == META_COMMAND))
3713 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3714 remains--; /* I've aborted */
3717 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3719 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3721 remains--; /* I've aborted */
3727 /* progress report by thread 0 for all threads */
3728 if (progress && thread->tid == 0)
3730 instr_time now_time;
3733 INSTR_TIME_SET_CURRENT(now_time);
3734 now = INSTR_TIME_GET_MICROSEC(now_time);
3735 if (now >= next_report)
3737 /* generate and show report */
3743 int64 run = now - last_report;
3752 * Add up the statistics of all threads.
3754 * XXX: No locking. There is no guarantee that we get an
3755 * atomic snapshot of the transaction count and latencies, so
3756 * these figures can well be off by a small amount. The
3757 * progress is report's purpose is to give a quick overview of
3758 * how the test is going, so that shouldn't matter too much.
3759 * (If a read from a 64-bit integer is not atomic, you might
3760 * get a "torn" read and completely bogus latencies though!)
3762 for (i = 0; i < progress_nclients; i++)
3764 count += state[i].cnt;
3765 lats += state[i].txn_latencies;
3766 sqlats += state[i].txn_sqlats;
3769 for (i = 0; i < progress_nthreads; i++)
3771 skipped += thread[i].throttle_latency_skipped;
3772 lags += thread[i].throttle_lag;
3775 total_run = (now - thread_start) / 1000000.0;
3776 tps = 1000000.0 * (count - last_count) / run;
3777 latency = 0.001 * (lats - last_lats) / (count - last_count);
3778 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3779 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3780 lag = 0.001 * (lags - last_lags) / (count - last_count);
3783 "progress: %.1f s, %.1f tps, "
3784 "lat %.3f ms stddev %.3f",
3785 total_run, tps, latency, stdev);
3788 fprintf(stderr, ", lag %.3f ms", lag);
3790 fprintf(stderr, ", " INT64_FORMAT " skipped",
3791 skipped - last_skipped);
3793 fprintf(stderr, "\n");
3797 last_sqlats = sqlats;
3800 last_skipped = skipped;
3803 * Ensure that the next report is in the future, in case
3804 * pgbench/postgres got stuck somewhere.
3808 next_report += (int64) progress *1000000;
3809 } while (now >= next_report);
3815 INSTR_TIME_SET_CURRENT(start);
3816 disconnect_all(state, nstate);
3817 INSTR_TIME_SET_CURRENT(end);
3818 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
3825 * Support for duration option: set timer_exceeded after so many seconds.
3831 handle_sig_alarm(SIGNAL_ARGS)
3833 timer_exceeded = true;
3837 setalarm(int seconds)
3839 pqsignal(SIGALRM, handle_sig_alarm);
3845 static VOID CALLBACK
3846 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3848 timer_exceeded = true;
3852 setalarm(int seconds)
3857 /* This function will be called at most once, so we can cheat a bit. */
3858 queue = CreateTimerQueue();
3859 if (seconds > ((DWORD) -1) / 1000 ||
3860 !CreateTimerQueueTimer(&timer, queue,
3861 win32_timer_callback, NULL, seconds * 1000, 0,
3862 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3864 fprintf(stderr, "failed to set timer\n");
3869 /* partial pthread implementation for Windows */
3871 typedef struct win32_pthread
3874 void *(*routine) (void *);
3879 static unsigned __stdcall
3880 win32_pthread_run(void *arg)
3882 win32_pthread *th = (win32_pthread *) arg;
3884 th->result = th->routine(th->arg);
3890 pthread_create(pthread_t *thread,
3891 pthread_attr_t *attr,
3892 void *(*start_routine) (void *),
3898 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3899 th->routine = start_routine;
3903 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3904 if (th->handle == NULL)
3916 pthread_join(pthread_t th, void **thread_return)
3918 if (th == NULL || th->handle == NULL)
3919 return errno = EINVAL;
3921 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3923 _dosmaperr(GetLastError());
3928 *thread_return = th->result;
3930 CloseHandle(th->handle);