4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2015, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define M_PI 3.14159265358979323846
58 #define ERRCODE_UNDEFINED_TABLE "42P01"
61 * Multi-platform pthread implementations
65 /* Use native win32 threads on Windows */
66 typedef struct win32_pthread *pthread_t;
67 typedef int pthread_attr_t;
69 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
70 static int pthread_join(pthread_t th, void **thread_return);
71 #elif defined(ENABLE_THREAD_SAFETY)
72 /* Use platform-dependent pthread capability */
75 /* No threads implementation, use none (-j 1) */
76 #define pthread_t void *
80 /********************************************************************
81 * some configurable parameters */
83 /* max number of clients allowed */
85 #define MAXCLIENTS (FD_SETSIZE - 10)
87 #define MAXCLIENTS 1024
90 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
91 #define DEFAULT_NXACTS 10 /* default nxacts */
93 #define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */
95 int nxacts = 0; /* number of transactions per client */
96 int duration = 0; /* duration in seconds */
99 * scaling factor. for example, scale = 10 will make 1000000 tuples in
100 * pgbench_accounts table.
105 * fillfactor. for example, fillfactor = 90 will use only 90 percent
106 * space during inserts and leave 10 percent free.
108 int fillfactor = 100;
111 * create foreign key constraints on the tables?
113 int foreign_keys = 0;
116 * use unlogged tables?
118 int unlogged_tables = 0;
121 * log sampling rate (1.0 = log everything, 0.0 = option not given)
123 double sample_rate = 0.0;
126 * When threads are throttled to a given rate limit, this is the target delay
127 * to reach that rate in usec. 0 is the default and means no throttling.
129 int64 throttle_delay = 0;
132 * Transactions which take longer than this limit (in usec) are counted as
133 * late, and reported as such, although they are completed anyway. When
134 * throttling is enabled, execution time slots that are more than this late
135 * are skipped altogether, and counted separately.
137 int64 latency_limit = 0;
140 * tablespace selection
142 char *tablespace = NULL;
143 char *index_tablespace = NULL;
146 * end of configurable parameters
147 *********************************************************************/
149 #define nbranches 1 /* Makes little sense to change this. Change
152 #define naccounts 100000
155 * The scale factor at/beyond which 32bit integers are incapable of storing
158 * Although the actual threshold is 21474, we use 20000 because it is easier to
159 * document and remember, and isn't that far away from the real threshold.
161 #define SCALE_32BIT_THRESHOLD 20000
163 bool use_log; /* log transaction latencies to a file */
164 bool use_quiet; /* quiet logging onto stderr */
165 int agg_interval; /* log aggregates instead of individual
167 int progress = 0; /* thread progress report every this seconds */
168 bool progress_timestamp = false; /* progress report with Unix time */
169 int progress_nclients = 0; /* number of clients for progress
171 int progress_nthreads = 0; /* number of threads for progress
173 bool is_connect; /* establish connection for each transaction */
174 bool is_latencies; /* report per-command latencies */
175 int main_pid; /* main process id used in log filename */
181 const char *progname;
183 volatile bool timer_exceeded = false; /* flag from signal handler */
185 /* variable definitions */
188 char *name; /* variable name */
189 char *value; /* its value */
192 #define MAX_FILES 128 /* max number of SQL script files allowed */
193 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
196 * structures used in custom query mode
201 PGconn *con; /* connection handle to DB */
202 int id; /* client No. */
203 int state; /* state No. */
204 int listen; /* 0 indicates that an async query has been
206 int sleeping; /* 1 indicates that the client is napping */
207 bool throttling; /* whether nap is for throttling */
208 Variable *variables; /* array of variable definitions */
210 int64 txn_scheduled; /* scheduled start time of transaction (usec) */
211 instr_time txn_begin; /* used for measuring schedule lag times */
212 instr_time stmt_begin; /* used for measuring statement latencies */
213 bool is_throttled; /* whether transaction throttling is done */
214 int use_file; /* index in sql_files for this client */
215 bool prepared[MAX_FILES];
217 /* per client collected stats */
218 int cnt; /* xacts count */
219 int ecnt; /* error count */
220 int64 txn_latencies; /* cumulated latencies */
221 int64 txn_sqlats; /* cumulated square latencies */
229 int tid; /* thread id */
230 pthread_t thread; /* thread handle */
231 CState *state; /* array of CState */
232 int nstate; /* length of state[] */
233 instr_time start_time; /* thread start time */
234 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
235 int *exec_count; /* number of cmd executions (per Command) */
236 unsigned short random_state[3]; /* separate randomness for each thread */
237 int64 throttle_trigger; /* previous/next throttling (us) */
239 /* per thread collected stats */
240 instr_time conn_time;
241 int64 throttle_lag; /* total transaction lag behind throttling */
242 int64 throttle_lag_max; /* max transaction lag */
243 int64 throttle_latency_skipped; /* lagging transactions
245 int64 latency_late; /* late transactions */
248 #define INVALID_THREAD ((pthread_t) 0)
251 * queries read from files
253 #define SQL_COMMAND 1
254 #define META_COMMAND 2
257 typedef enum QueryMode
259 QUERY_SIMPLE, /* simple query */
260 QUERY_EXTENDED, /* extended query */
261 QUERY_PREPARED, /* extended query with prepared statements */
265 static QueryMode querymode = QUERY_SIMPLE;
266 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
270 char *line; /* full text of command line */
271 int command_num; /* unique index of this Command struct */
272 int type; /* command type (SQL_COMMAND or META_COMMAND) */
273 int argc; /* number of command words */
274 char *argv[MAX_ARGS]; /* command word list */
275 int cols[MAX_ARGS]; /* corresponding column starting from 1 */
276 PgBenchExpr *expr; /* parsed expression */
282 long start_time; /* when does the interval start */
283 int cnt; /* number of transactions */
284 int skipped; /* number of transactions skipped under --rate
285 * and --latency-limit */
287 double min_latency; /* min/max latencies */
289 double sum_latency; /* sum(latency), sum(latency^2) - for
295 double sum_lag; /* sum(lag) */
296 double sum2_lag; /* sum(lag*lag) */
299 static Command **sql_files[MAX_FILES]; /* SQL script files */
300 static int num_files; /* number of script files */
301 static int num_commands = 0; /* total number of Command structs */
302 static int debug = 0; /* debug flag */
304 /* default scenario */
305 static char *tpc_b = {
306 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
307 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
308 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
309 "\\setrandom aid 1 :naccounts\n"
310 "\\setrandom bid 1 :nbranches\n"
311 "\\setrandom tid 1 :ntellers\n"
312 "\\setrandom delta -5000 5000\n"
314 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
315 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
316 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
317 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
318 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
323 static char *simple_update = {
324 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
325 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
326 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
327 "\\setrandom aid 1 :naccounts\n"
328 "\\setrandom bid 1 :nbranches\n"
329 "\\setrandom tid 1 :ntellers\n"
330 "\\setrandom delta -5000 5000\n"
332 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
333 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
334 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
339 static char *select_only = {
340 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
341 "\\setrandom aid 1 :naccounts\n"
342 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
345 /* Function prototypes */
346 static void setalarm(int seconds);
347 static void *threadRun(void *arg);
349 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
350 AggVals *agg, bool skipped);
355 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
357 " %s [OPTION]... [DBNAME]\n"
358 "\nInitialization options:\n"
359 " -i, --initialize invokes initialization mode\n"
360 " -F, --fillfactor=NUM set fill factor\n"
361 " -n, --no-vacuum do not run VACUUM after initialization\n"
362 " -q, --quiet quiet logging (one message each 5 seconds)\n"
363 " -s, --scale=NUM scaling factor\n"
364 " --foreign-keys create foreign key constraints between tables\n"
365 " --index-tablespace=TABLESPACE\n"
366 " create indexes in the specified tablespace\n"
367 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
368 " --unlogged-tables create tables as unlogged tables\n"
369 "\nBenchmarking options:\n"
370 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
371 " -C, --connect establish new connection for each transaction\n"
372 " -D, --define=VARNAME=VALUE\n"
373 " define variable for use by custom script\n"
374 " -f, --file=FILENAME read transaction script from FILENAME\n"
375 " -j, --jobs=NUM number of threads (default: 1)\n"
376 " -l, --log write transaction times to log file\n"
377 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
378 " -M, --protocol=simple|extended|prepared\n"
379 " protocol for submitting queries (default: simple)\n"
380 " -n, --no-vacuum do not run VACUUM before tests\n"
381 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
382 " -P, --progress=NUM show thread progress report every NUM seconds\n"
383 " -r, --report-latencies report average latency per command\n"
384 " -R, --rate=NUM target rate in transactions per second\n"
385 " -s, --scale=NUM report this scale factor in output\n"
386 " -S, --select-only perform SELECT-only transactions\n"
387 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
388 " -T, --time=NUM duration of benchmark test in seconds\n"
389 " -v, --vacuum-all vacuum all four standard tables before tests\n"
390 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
391 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
392 " --progress-timestamp use Unix epoch timestamps for progress\n"
393 "\nCommon options:\n"
394 " -d, --debug print debugging output\n"
395 " -h, --host=HOSTNAME database server host or socket directory\n"
396 " -p, --port=PORT database server port number\n"
397 " -U, --username=USERNAME connect as specified database user\n"
398 " -V, --version output version information, then exit\n"
399 " -?, --help show this help, then exit\n"
401 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
406 * strtoint64 -- convert a string to 64-bit integer
408 * This function is a modified version of scanint8() from
409 * src/backend/utils/adt/int8.c.
412 strtoint64(const char *str)
414 const char *ptr = str;
419 * Do our own scan, rather than relying on sscanf which might be broken
423 /* skip leading spaces */
424 while (*ptr && isspace((unsigned char) *ptr))
433 * Do an explicit check for INT64_MIN. Ugly though this is, it's
434 * cleaner than trying to get the loop below to handle it portably.
436 if (strncmp(ptr, "9223372036854775808", 19) == 0)
438 result = PG_INT64_MIN;
444 else if (*ptr == '+')
447 /* require at least one digit */
448 if (!isdigit((unsigned char) *ptr))
449 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
452 while (*ptr && isdigit((unsigned char) *ptr))
454 int64 tmp = result * 10 + (*ptr++ - '0');
456 if ((tmp / 10) != result) /* overflow? */
457 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
463 /* allow trailing whitespace, but not other trailing chars */
464 while (*ptr != '\0' && isspace((unsigned char) *ptr))
468 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
470 return ((sign < 0) ? -result : result);
473 /* random number generator: uniform distribution from min to max inclusive */
475 getrand(TState *thread, int64 min, int64 max)
478 * Odd coding is so that min and max have approximately the same chance of
479 * being selected as do numbers between them.
481 * pg_erand48() is thread-safe and concurrent, which is why we use it
482 * rather than random(), which in glibc is non-reentrant, and therefore
483 * protected by a mutex, and therefore a bottleneck on machines with many
486 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
490 * random number generator: exponential distribution from min to max inclusive.
491 * the threshold is so that the density of probability for the last cut-off max
492 * value is exp(-threshold).
495 getExponentialRand(TState *thread, int64 min, int64 max, double threshold)
501 Assert(threshold > 0.0);
502 cut = exp(-threshold);
503 /* erand in [0, 1), uniform in (0, 1] */
504 uniform = 1.0 - pg_erand48(thread->random_state);
507 * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1)
509 Assert((1.0 - cut) != 0.0);
510 rand = -log(cut + (1.0 - cut) * uniform) / threshold;
511 /* return int64 random number within between min and max */
512 return min + (int64) ((max - min + 1) * rand);
515 /* random number generator: gaussian distribution from min to max inclusive */
517 getGaussianRand(TState *thread, int64 min, int64 max, double threshold)
523 * Get user specified random number from this loop, with -threshold <
526 * This loop is executed until the number is in the expected range.
528 * As the minimum threshold is 2.0, the probability of looping is low:
529 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
530 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
531 * the worst case. For a 5.0 threshold value, the looping probability is
532 * about e^{-5} * 2 / pi ~ 0.43%.
537 * pg_erand48 generates [0,1), but for the basic version of the
538 * Box-Muller transform the two uniformly distributed random numbers
539 * are expected in (0, 1] (see
540 * http://en.wikipedia.org/wiki/Box_muller)
542 double rand1 = 1.0 - pg_erand48(thread->random_state);
543 double rand2 = 1.0 - pg_erand48(thread->random_state);
545 /* Box-Muller basic form transform */
546 double var_sqrt = sqrt(-2.0 * log(rand1));
548 stdev = var_sqrt * sin(2.0 * M_PI * rand2);
551 * we may try with cos, but there may be a bias induced if the
552 * previous value fails the test. To be on the safe side, let us try
556 while (stdev < -threshold || stdev >= threshold);
558 /* stdev is in [-threshold, threshold), normalization to [0,1) */
559 rand = (stdev + threshold) / (threshold * 2.0);
561 /* return int64 random number within between min and max */
562 return min + (int64) ((max - min + 1) * rand);
566 * random number generator: generate a value, such that the series of values
567 * will approximate a Poisson distribution centered on the given value.
570 getPoissonRand(TState *thread, int64 center)
573 * Use inverse transform sampling to generate a value > 0, such that the
574 * expected (i.e. average) value is the given argument.
578 /* erand in [0, 1), uniform in (0, 1] */
579 uniform = 1.0 - pg_erand48(thread->random_state);
581 return (int64) (-log(uniform) * ((double) center) + 0.5);
584 /* call PQexec() and exit() on failure */
586 executeStatement(PGconn *con, const char *sql)
590 res = PQexec(con, sql);
591 if (PQresultStatus(res) != PGRES_COMMAND_OK)
593 fprintf(stderr, "%s", PQerrorMessage(con));
599 /* call PQexec() and complain, but without exiting, on failure */
601 tryExecuteStatement(PGconn *con, const char *sql)
605 res = PQexec(con, sql);
606 if (PQresultStatus(res) != PGRES_COMMAND_OK)
608 fprintf(stderr, "%s", PQerrorMessage(con));
609 fprintf(stderr, "(ignoring this error and continuing anyway)\n");
614 /* set up a connection to the backend */
619 static char *password = NULL;
623 * Start the connection. Loop until we have a password if requested by
628 #define PARAMS_ARRAY_SIZE 7
630 const char *keywords[PARAMS_ARRAY_SIZE];
631 const char *values[PARAMS_ARRAY_SIZE];
633 keywords[0] = "host";
635 keywords[1] = "port";
637 keywords[2] = "user";
639 keywords[3] = "password";
640 values[3] = password;
641 keywords[4] = "dbname";
643 keywords[5] = "fallback_application_name";
644 values[5] = progname;
650 conn = PQconnectdbParams(keywords, values, true);
654 fprintf(stderr, "connection to database \"%s\" failed\n",
659 if (PQstatus(conn) == CONNECTION_BAD &&
660 PQconnectionNeedsPassword(conn) &&
664 password = simple_prompt("Password: ", 100, false);
669 /* check to see that the backend connection was successfully made */
670 if (PQstatus(conn) == CONNECTION_BAD)
672 fprintf(stderr, "connection to database \"%s\" failed:\n%s",
673 dbName, PQerrorMessage(conn));
681 /* throw away response from backend */
683 discard_response(CState *state)
689 res = PQgetResult(state->con);
696 compareVariables(const void *v1, const void *v2)
698 return strcmp(((const Variable *) v1)->name,
699 ((const Variable *) v2)->name);
703 getVariable(CState *st, char *name)
708 /* On some versions of Solaris, bsearch of zero items dumps core */
709 if (st->nvariables <= 0)
713 var = (Variable *) bsearch((void *) &key,
714 (void *) st->variables,
724 /* check whether the name consists of alphabets, numerals and underscores. */
726 isLegalVariableName(const char *name)
730 for (i = 0; name[i] != '\0'; i++)
732 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
740 putVariable(CState *st, const char *context, char *name, char *value)
746 /* On some versions of Solaris, bsearch of zero items dumps core */
747 if (st->nvariables > 0)
748 var = (Variable *) bsearch((void *) &key,
749 (void *) st->variables,
761 * Check for the name only when declaring a new variable to avoid
764 if (!isLegalVariableName(name))
766 fprintf(stderr, "%s: invalid variable name: \"%s\"\n",
772 newvars = (Variable *) pg_realloc(st->variables,
773 (st->nvariables + 1) * sizeof(Variable));
775 newvars = (Variable *) pg_malloc(sizeof(Variable));
777 st->variables = newvars;
779 var = &newvars[st->nvariables];
781 var->name = pg_strdup(name);
782 var->value = pg_strdup(value);
786 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
793 /* dup then free, in case value is pointing at this variable */
794 val = pg_strdup(value);
804 parseVariable(const char *sql, int *eaten)
812 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
817 memcpy(name, &sql[1], i - 1);
825 replaceVariable(char **sql, char *param, int len, char *value)
827 int valueln = strlen(value);
831 size_t offset = param - *sql;
833 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
834 param = *sql + offset;
838 memmove(param + valueln, param + len, strlen(param + len) + 1);
839 memcpy(param, value, valueln);
841 return param + valueln;
845 assignVariables(CState *st, char *sql)
852 while ((p = strchr(p, ':')) != NULL)
856 name = parseVariable(p, &eaten);
866 val = getVariable(st, name);
874 p = replaceVariable(&sql, p, eaten, val);
881 getQueryParams(CState *st, const Command *command, const char **params)
885 for (i = 0; i < command->argc - 1; i++)
886 params[i] = getVariable(st, command->argv[i + 1]);
890 * Recursive evaluation of an expression in a pgbench script
891 * using the current state of variables.
892 * Returns whether the evaluation was ok,
893 * the value itself is returned through the retval pointer.
896 evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval)
900 case ENODE_INTEGER_CONSTANT:
902 *retval = expr->u.integer_constant.ival;
910 if ((var = getVariable(st, expr->u.variable.varname)) == NULL)
912 fprintf(stderr, "undefined variable \"%s\"\n",
913 expr->u.variable.varname);
916 *retval = strtoint64(var);
925 if (!evaluateExpr(st, expr->u.operator.lexpr, &lval))
927 if (!evaluateExpr(st, expr->u.operator.rexpr, &rval))
929 switch (expr->u.operator.operator)
932 *retval = lval + rval;
936 *retval = lval - rval;
940 *retval = lval * rval;
946 fprintf(stderr, "division by zero\n");
949 *retval = lval / rval;
955 fprintf(stderr, "division by zero\n");
958 *retval = lval % rval;
962 fprintf(stderr, "bad operator\n");
970 fprintf(stderr, "bad expression\n");
975 * Run a shell command. The result is assigned to the variable if not NULL.
976 * Return true if succeeded, or false on error.
979 runShellCommand(CState *st, char *variable, char **argv, int argc)
981 char command[SHELL_COMMAND_SIZE];
990 * Join arguments with whitespace separators. Arguments starting with
991 * exactly one colon are treated as variables:
992 * name - append a string "name"
993 * :var - append a variable named 'var'
994 * ::name - append a string ":name"
997 for (i = 0; i < argc; i++)
1002 if (argv[i][0] != ':')
1004 arg = argv[i]; /* a string literal */
1006 else if (argv[i][1] == ':')
1008 arg = argv[i] + 1; /* a string literal starting with colons */
1010 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
1012 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1017 arglen = strlen(arg);
1018 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
1020 fprintf(stderr, "%s: shell command is too long\n", argv[0]);
1025 command[len++] = ' ';
1026 memcpy(command + len, arg, arglen);
1030 command[len] = '\0';
1032 /* Fast path for non-assignment case */
1033 if (variable == NULL)
1035 if (system(command))
1037 if (!timer_exceeded)
1038 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1044 /* Execute the command with pipe and read the standard output. */
1045 if ((fp = popen(command, "r")) == NULL)
1047 fprintf(stderr, "%s: could not launch shell command\n", argv[0]);
1050 if (fgets(res, sizeof(res), fp) == NULL)
1052 if (!timer_exceeded)
1053 fprintf(stderr, "%s: could not read result of shell command\n", argv[0]);
1059 fprintf(stderr, "%s: could not close shell command\n", argv[0]);
1063 /* Check whether the result is an integer and assign it to the variable */
1064 retval = (int) strtol(res, &endptr, 10);
1065 while (*endptr != '\0' && isspace((unsigned char) *endptr))
1067 if (*res == '\0' || *endptr != '\0')
1069 fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n",
1073 snprintf(res, sizeof(res), "%d", retval);
1074 if (!putVariable(st, "setshell", variable, res))
1078 printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res);
1083 #define MAX_PREPARE_NAME 32
1085 preparedStatementName(char *buffer, int file, int state)
1087 sprintf(buffer, "P%d_%d", file, state);
1091 clientDone(CState *st, bool ok)
1093 (void) ok; /* unused */
1095 if (st->con != NULL)
1100 return false; /* always false */
1104 agg_vals_init(AggVals *aggs, instr_time start)
1106 /* basic counters */
1107 aggs->cnt = 0; /* number of transactions (includes skipped) */
1108 aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
1110 aggs->sum_latency = 0; /* SUM(latency) */
1111 aggs->sum2_latency = 0; /* SUM(latency*latency) */
1113 /* min and max transaction duration */
1114 aggs->min_latency = 0;
1115 aggs->max_latency = 0;
1117 /* schedule lag counters */
1123 /* start of the current interval */
1124 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
1127 /* return false iff client should be disconnected */
1129 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
1133 bool trans_needs_throttle = false;
1137 * gettimeofday() isn't free, so we get the current timestamp lazily the
1138 * first time it's needed, and reuse the same value throughout this
1139 * function after that. This also ensures that e.g. the calculated latency
1140 * reported in the log file and in the totals are the same. Zero means
1141 * "not set yet". Reset "now" when we step to the next command with "goto
1145 INSTR_TIME_SET_ZERO(now);
1147 commands = sql_files[st->use_file];
1150 * Handle throttling once per transaction by sleeping. It is simpler to
1151 * do this here rather than at the end, because so much complicated logic
1152 * happens below when statements finish.
1154 if (throttle_delay && !st->is_throttled)
1157 * Generate a delay such that the series of delays will approximate a
1158 * Poisson distribution centered on the throttle_delay time.
1160 * If transactions are too slow or a given wait is shorter than a
1161 * transaction, the next transaction will start right away.
1163 int64 wait = getPoissonRand(thread, throttle_delay);
1165 thread->throttle_trigger += wait;
1166 st->txn_scheduled = thread->throttle_trigger;
1169 * If this --latency-limit is used, and this slot is already late so
1170 * that the transaction will miss the latency limit even if it
1171 * completed immediately, we skip this time slot and iterate till the
1172 * next slot that isn't late yet.
1178 if (INSTR_TIME_IS_ZERO(now))
1179 INSTR_TIME_SET_CURRENT(now);
1180 now_us = INSTR_TIME_GET_MICROSEC(now);
1181 while (thread->throttle_trigger < now_us - latency_limit)
1183 thread->throttle_latency_skipped++;
1186 doLog(thread, st, logfile, &now, agg, true);
1188 wait = getPoissonRand(thread, throttle_delay);
1189 thread->throttle_trigger += wait;
1190 st->txn_scheduled = thread->throttle_trigger;
1195 st->throttling = true;
1196 st->is_throttled = true;
1198 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
1203 { /* are we sleeping? */
1206 if (INSTR_TIME_IS_ZERO(now))
1207 INSTR_TIME_SET_CURRENT(now);
1208 now_us = INSTR_TIME_GET_MICROSEC(now);
1209 if (st->txn_scheduled <= now_us)
1211 st->sleeping = 0; /* Done sleeping, go ahead with next command */
1214 /* Measure lag of throttled transaction relative to target */
1215 int64 lag = now_us - st->txn_scheduled;
1217 thread->throttle_lag += lag;
1218 if (lag > thread->throttle_lag_max)
1219 thread->throttle_lag_max = lag;
1220 st->throttling = false;
1224 return true; /* Still sleeping, nothing to do here */
1228 { /* are we receiver? */
1229 if (commands[st->state]->type == SQL_COMMAND)
1232 fprintf(stderr, "client %d receiving\n", st->id);
1233 if (!PQconsumeInput(st->con))
1234 { /* there's something wrong */
1235 fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
1236 return clientDone(st, false);
1238 if (PQisBusy(st->con))
1239 return true; /* don't have the whole result yet */
1243 * command finished: accumulate per-command execution times in
1244 * thread-local data structure, if per-command latencies are requested
1248 int cnum = commands[st->state]->command_num;
1250 if (INSTR_TIME_IS_ZERO(now))
1251 INSTR_TIME_SET_CURRENT(now);
1252 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1253 now, st->stmt_begin);
1254 thread->exec_count[cnum]++;
1257 /* transaction finished: calculate latency and log the transaction */
1258 if (commands[st->state + 1] == NULL)
1260 /* only calculate latency if an option is used that needs it */
1261 if (progress || throttle_delay || latency_limit)
1265 if (INSTR_TIME_IS_ZERO(now))
1266 INSTR_TIME_SET_CURRENT(now);
1268 latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
1270 st->txn_latencies += latency;
1273 * XXX In a long benchmark run of high-latency transactions,
1274 * this int64 addition eventually overflows. For example, 100
1275 * threads running 10s transactions will overflow it in 2.56
1276 * hours. With a more-typical OLTP workload of .1s
1277 * transactions, overflow would take 256 hours.
1279 st->txn_sqlats += latency * latency;
1281 /* record over the limit transactions if needed. */
1282 if (latency_limit && latency > latency_limit)
1283 thread->latency_late++;
1286 /* record the time it took in the log */
1288 doLog(thread, st, logfile, &now, agg, false);
1291 if (commands[st->state]->type == SQL_COMMAND)
1294 * Read and discard the query result; note this is not included in
1295 * the statement latency numbers.
1297 res = PQgetResult(st->con);
1298 switch (PQresultStatus(res))
1300 case PGRES_COMMAND_OK:
1301 case PGRES_TUPLES_OK:
1304 fprintf(stderr, "client %d aborted in state %d: %s",
1305 st->id, st->state, PQerrorMessage(st->con));
1307 return clientDone(st, false);
1310 discard_response(st);
1313 if (commands[st->state + 1] == NULL)
1322 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1323 return clientDone(st, true); /* exit success */
1326 /* increment state counter */
1328 if (commands[st->state] == NULL)
1331 st->use_file = (int) getrand(thread, 0, num_files - 1);
1332 commands = sql_files[st->use_file];
1333 st->is_throttled = false;
1336 * No transaction is underway anymore, which means there is
1337 * nothing to listen to right now. When throttling rate limits
1338 * are active, a sleep will happen next, as the next transaction
1339 * starts. And then in any case the next SQL command will set
1343 trans_needs_throttle = (throttle_delay > 0);
1347 if (st->con == NULL)
1352 INSTR_TIME_SET_CURRENT(start);
1353 if ((st->con = doConnect()) == NULL)
1355 fprintf(stderr, "client %d aborted while establishing connection\n",
1357 return clientDone(st, false);
1359 INSTR_TIME_SET_CURRENT(end);
1360 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1364 * This ensures that a throttling delay is inserted before proceeding with
1365 * sql commands, after the first transaction. The first transaction
1366 * throttling is performed when first entering doCustom.
1368 if (trans_needs_throttle)
1370 trans_needs_throttle = false;
1374 /* Record transaction start time under logging, progress or throttling */
1375 if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
1377 INSTR_TIME_SET_CURRENT(st->txn_begin);
1380 * When not throttling, this is also the transaction's scheduled start
1383 if (!throttle_delay)
1384 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
1387 /* Record statement start time if per-command latencies are requested */
1389 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1391 if (commands[st->state]->type == SQL_COMMAND)
1393 const Command *command = commands[st->state];
1396 if (querymode == QUERY_SIMPLE)
1400 sql = pg_strdup(command->argv[0]);
1401 sql = assignVariables(st, sql);
1404 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1405 r = PQsendQuery(st->con, sql);
1408 else if (querymode == QUERY_EXTENDED)
1410 const char *sql = command->argv[0];
1411 const char *params[MAX_ARGS];
1413 getQueryParams(st, command, params);
1416 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1417 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1418 NULL, params, NULL, NULL, 0);
1420 else if (querymode == QUERY_PREPARED)
1422 char name[MAX_PREPARE_NAME];
1423 const char *params[MAX_ARGS];
1425 if (!st->prepared[st->use_file])
1429 for (j = 0; commands[j] != NULL; j++)
1432 char name[MAX_PREPARE_NAME];
1434 if (commands[j]->type != SQL_COMMAND)
1436 preparedStatementName(name, st->use_file, j);
1437 res = PQprepare(st->con, name,
1438 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1439 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1440 fprintf(stderr, "%s", PQerrorMessage(st->con));
1443 st->prepared[st->use_file] = true;
1446 getQueryParams(st, command, params);
1447 preparedStatementName(name, st->use_file, st->state);
1450 fprintf(stderr, "client %d sending %s\n", st->id, name);
1451 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1452 params, NULL, NULL, 0);
1454 else /* unknown sql mode */
1460 fprintf(stderr, "client %d could not send %s\n",
1461 st->id, command->argv[0]);
1465 st->listen = 1; /* flags that should be listened */
1467 else if (commands[st->state]->type == META_COMMAND)
1469 int argc = commands[st->state]->argc,
1471 char **argv = commands[st->state]->argv;
1475 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1476 for (i = 1; i < argc; i++)
1477 fprintf(stderr, " %s", argv[i]);
1478 fprintf(stderr, "\n");
1481 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1486 double threshold = 0;
1489 if (*argv[2] == ':')
1491 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1493 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1498 min = strtoint64(var);
1501 min = strtoint64(argv[2]);
1503 if (*argv[3] == ':')
1505 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1507 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1512 max = strtoint64(var);
1515 max = strtoint64(argv[3]);
1519 fprintf(stderr, "%s: \\setrandom maximum is less than minimum\n",
1526 * Generate random number functions need to be able to subtract
1527 * max from min and add one to the result without overflowing.
1528 * Since we know max > min, we can detect overflow just by
1529 * checking for a negative result. But we must check both that the
1530 * subtraction doesn't overflow, and that adding one to the result
1531 * doesn't overflow either.
1533 if (max - min < 0 || (max - min) + 1 < 0)
1535 fprintf(stderr, "%s: \\setrandom range is too large\n",
1541 if (argc == 4 || /* uniform without or with "uniform" keyword */
1542 (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0))
1545 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1547 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1549 else if (argc == 6 &&
1550 ((pg_strcasecmp(argv[4], "gaussian") == 0) ||
1551 (pg_strcasecmp(argv[4], "exponential") == 0)))
1553 if (*argv[5] == ':')
1555 if ((var = getVariable(st, argv[5] + 1)) == NULL)
1557 fprintf(stderr, "%s: invalid threshold number: \"%s\"\n",
1562 threshold = strtod(var, NULL);
1565 threshold = strtod(argv[5], NULL);
1567 if (pg_strcasecmp(argv[4], "gaussian") == 0)
1569 if (threshold < MIN_GAUSSIAN_THRESHOLD)
1571 fprintf(stderr, "gaussian threshold must be at least %f (not \"%s\")\n", MIN_GAUSSIAN_THRESHOLD, argv[5]);
1576 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold));
1578 snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold));
1580 else if (pg_strcasecmp(argv[4], "exponential") == 0)
1582 if (threshold <= 0.0)
1584 fprintf(stderr, "exponential threshold must be greater than zero (not \"%s\")\n", argv[5]);
1589 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold));
1591 snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold));
1594 else /* this means an error somewhere in the parsing phase... */
1596 fprintf(stderr, "%s: invalid arguments for \\setrandom\n",
1602 if (!putVariable(st, argv[0], argv[1], res))
1610 else if (pg_strcasecmp(argv[0], "set") == 0)
1613 PgBenchExpr *expr = commands[st->state]->expr;
1616 if (!evaluateExpr(st, expr, &result))
1621 sprintf(res, INT64_FORMAT, result);
1623 if (!putVariable(st, argv[0], argv[1], res))
1631 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1637 if (*argv[1] == ':')
1639 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1641 fprintf(stderr, "%s: undefined variable \"%s\"\n",
1649 usec = atoi(argv[1]);
1653 if (pg_strcasecmp(argv[2], "ms") == 0)
1655 else if (pg_strcasecmp(argv[2], "s") == 0)
1661 INSTR_TIME_SET_CURRENT(now);
1662 st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
1667 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1669 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1671 if (timer_exceeded) /* timeout */
1672 return clientDone(st, true);
1673 else if (!ret) /* on error */
1678 else /* succeeded */
1681 else if (pg_strcasecmp(argv[0], "shell") == 0)
1683 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1685 if (timer_exceeded) /* timeout */
1686 return clientDone(st, true);
1687 else if (!ret) /* on error */
1692 else /* succeeded */
1702 * print log entry after completing one transaction.
1705 doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
1712 * Skip the log entry if sampling is enabled and this row doesn't belong
1713 * to the random sample.
1715 if (sample_rate != 0.0 &&
1716 pg_erand48(thread->random_state) > sample_rate)
1719 if (INSTR_TIME_IS_ZERO(*now))
1720 INSTR_TIME_SET_CURRENT(*now);
1722 latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
1726 lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
1728 /* should we aggregate the results or not? */
1729 if (agg_interval > 0)
1732 * Are we still in the same interval? If yes, accumulate the values
1733 * (print them otherwise)
1735 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
1741 * there is no latency to record if the transaction was
1748 agg->sum_latency += latency;
1749 agg->sum2_latency += latency * latency;
1751 /* first in this aggregation interval */
1752 if ((agg->cnt == 1) || (latency < agg->min_latency))
1753 agg->min_latency = latency;
1755 if ((agg->cnt == 1) || (latency > agg->max_latency))
1756 agg->max_latency = latency;
1758 /* and the same for schedule lag */
1761 agg->sum_lag += lag;
1762 agg->sum2_lag += lag * lag;
1764 if ((agg->cnt == 1) || (lag < agg->min_lag))
1766 if ((agg->cnt == 1) || (lag > agg->max_lag))
1774 * Loop until we reach the interval of the current transaction
1775 * (and print all the empty intervals in between).
1777 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
1780 * This is a non-Windows branch (thanks to the ifdef in
1781 * usage), so we don't need to handle this in a special way
1784 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
1793 fprintf(logfile, " %.0f %.0f %.0f %.0f",
1799 fprintf(logfile, " %d", agg->skipped);
1801 fputc('\n', logfile);
1803 /* move to the next inteval */
1804 agg->start_time = agg->start_time + agg_interval;
1806 /* reset for "no transaction" intervals */
1809 agg->min_latency = 0;
1810 agg->max_latency = 0;
1811 agg->sum_latency = 0;
1812 agg->sum2_latency = 0;
1819 /* reset the values to include only the current transaction. */
1821 agg->skipped = skipped ? 1 : 0;
1822 agg->min_latency = latency;
1823 agg->max_latency = latency;
1824 agg->sum_latency = skipped ? 0.0 : latency;
1825 agg->sum2_latency = skipped ? 0.0 : latency * latency;
1829 agg->sum2_lag = lag * lag;
1834 /* no, print raw transactions */
1837 /* This is more than we really ought to know about instr_time */
1839 fprintf(logfile, "%d %d skipped %d %ld %ld",
1840 st->id, st->cnt, st->use_file,
1841 (long) now->tv_sec, (long) now->tv_usec);
1843 fprintf(logfile, "%d %d %.0f %d %ld %ld",
1844 st->id, st->cnt, latency, st->use_file,
1845 (long) now->tv_sec, (long) now->tv_usec);
1848 /* On Windows, instr_time doesn't provide a timestamp anyway */
1850 fprintf(logfile, "%d %d skipped %d 0 0",
1851 st->id, st->cnt, st->use_file);
1853 fprintf(logfile, "%d %d %.0f %d 0 0",
1854 st->id, st->cnt, latency, st->use_file);
1857 fprintf(logfile, " %.0f", lag);
1858 fputc('\n', logfile);
1862 /* discard connections */
1864 disconnect_all(CState *state, int length)
1868 for (i = 0; i < length; i++)
1872 PQfinish(state[i].con);
1873 state[i].con = NULL;
1878 /* create tables and setup data */
1880 init(bool is_no_vacuum)
1883 * The scale factor at/beyond which 32-bit integers are insufficient for
1884 * storing TPC-B account IDs.
1886 * Although the actual threshold is 21474, we use 20000 because it is easier to
1887 * document and remember, and isn't that far away from the real threshold.
1889 #define SCALE_32BIT_THRESHOLD 20000
1892 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1893 * fields in these table declarations were intended to comply with that.
1894 * The pgbench_accounts table complies with that because the "filler"
1895 * column is set to blank-padded empty string. But for all other tables
1896 * the columns default to NULL and so don't actually take any space. We
1897 * could fix that by giving them non-null default values. However, that
1898 * would completely break comparability of pgbench results with prior
1899 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1900 * anyway, we stick with the historical behavior.
1904 const char *table; /* table name */
1905 const char *smcols; /* column decls if accountIDs are 32 bits */
1906 const char *bigcols; /* column decls if accountIDs are 64 bits */
1907 int declare_fillfactor;
1909 static const struct ddlinfo DDLs[] = {
1912 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1913 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1918 "tid int not null,bid int,tbalance int,filler char(84)",
1919 "tid int not null,bid int,tbalance int,filler char(84)",
1924 "aid int not null,bid int,abalance int,filler char(84)",
1925 "aid bigint not null,bid int,abalance int,filler char(84)",
1930 "bid int not null,bbalance int,filler char(88)",
1931 "bid int not null,bbalance int,filler char(88)",
1935 static const char *const DDLINDEXes[] = {
1936 "alter table pgbench_branches add primary key (bid)",
1937 "alter table pgbench_tellers add primary key (tid)",
1938 "alter table pgbench_accounts add primary key (aid)"
1940 static const char *const DDLKEYs[] = {
1941 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1942 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1943 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1944 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1945 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1954 /* used to track elapsed time and estimate of the remaining time */
1959 int log_interval = 1;
1961 if ((con = doConnect()) == NULL)
1964 for (i = 0; i < lengthof(DDLs); i++)
1968 const struct ddlinfo *ddl = &DDLs[i];
1971 /* Remove old table, if it exists. */
1972 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1973 executeStatement(con, buffer);
1975 /* Construct new create table statement. */
1977 if (ddl->declare_fillfactor)
1978 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1979 " with (fillfactor=%d)", fillfactor);
1980 if (tablespace != NULL)
1982 char *escape_tablespace;
1984 escape_tablespace = PQescapeIdentifier(con, tablespace,
1985 strlen(tablespace));
1986 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1987 " tablespace %s", escape_tablespace);
1988 PQfreemem(escape_tablespace);
1991 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1993 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1994 unlogged_tables ? " unlogged" : "",
1995 ddl->table, cols, opts);
1997 executeStatement(con, buffer);
2000 executeStatement(con, "begin");
2002 for (i = 0; i < nbranches * scale; i++)
2004 /* "filler" column defaults to NULL */
2005 snprintf(sql, sizeof(sql),
2006 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
2008 executeStatement(con, sql);
2011 for (i = 0; i < ntellers * scale; i++)
2013 /* "filler" column defaults to NULL */
2014 snprintf(sql, sizeof(sql),
2015 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
2016 i + 1, i / ntellers + 1);
2017 executeStatement(con, sql);
2020 executeStatement(con, "commit");
2023 * fill the pgbench_accounts table with some data
2025 fprintf(stderr, "creating tables...\n");
2027 executeStatement(con, "begin");
2028 executeStatement(con, "truncate pgbench_accounts");
2030 res = PQexec(con, "copy pgbench_accounts from stdin");
2031 if (PQresultStatus(res) != PGRES_COPY_IN)
2033 fprintf(stderr, "%s", PQerrorMessage(con));
2038 INSTR_TIME_SET_CURRENT(start);
2040 for (k = 0; k < (int64) naccounts * scale; k++)
2044 /* "filler" column defaults to blank padded empty string */
2045 snprintf(sql, sizeof(sql),
2046 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
2047 j, k / naccounts + 1, 0);
2048 if (PQputline(con, sql))
2050 fprintf(stderr, "PQputline failed\n");
2055 * If we want to stick with the original logging, print a message each
2056 * 100k inserted rows.
2058 if ((!use_quiet) && (j % 100000 == 0))
2060 INSTR_TIME_SET_CURRENT(diff);
2061 INSTR_TIME_SUBTRACT(diff, start);
2063 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2064 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2066 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2067 j, (int64) naccounts * scale,
2068 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
2069 elapsed_sec, remaining_sec);
2071 /* let's not call the timing for each row, but only each 100 rows */
2072 else if (use_quiet && (j % 100 == 0))
2074 INSTR_TIME_SET_CURRENT(diff);
2075 INSTR_TIME_SUBTRACT(diff, start);
2077 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
2078 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
2080 /* have we reached the next interval (or end)? */
2081 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
2083 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",
2084 j, (int64) naccounts * scale,
2085 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
2087 /* skip to the next interval */
2088 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
2093 if (PQputline(con, "\\.\n"))
2095 fprintf(stderr, "very last PQputline failed\n");
2100 fprintf(stderr, "PQendcopy failed\n");
2103 executeStatement(con, "commit");
2108 fprintf(stderr, "vacuum...\n");
2109 executeStatement(con, "vacuum analyze pgbench_branches");
2110 executeStatement(con, "vacuum analyze pgbench_tellers");
2111 executeStatement(con, "vacuum analyze pgbench_accounts");
2112 executeStatement(con, "vacuum analyze pgbench_history");
2118 fprintf(stderr, "set primary keys...\n");
2119 for (i = 0; i < lengthof(DDLINDEXes); i++)
2123 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
2125 if (index_tablespace != NULL)
2127 char *escape_tablespace;
2129 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
2130 strlen(index_tablespace));
2131 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
2132 " using index tablespace %s", escape_tablespace);
2133 PQfreemem(escape_tablespace);
2136 executeStatement(con, buffer);
2140 * create foreign keys
2144 fprintf(stderr, "set foreign keys...\n");
2145 for (i = 0; i < lengthof(DDLKEYs); i++)
2147 executeStatement(con, DDLKEYs[i]);
2151 fprintf(stderr, "done.\n");
2156 * Parse the raw sql and replace :param to $n.
2159 parseQuery(Command *cmd, const char *raw_sql)
2164 sql = pg_strdup(raw_sql);
2168 while ((p = strchr(p, ':')) != NULL)
2174 name = parseVariable(p, &eaten);
2184 if (cmd->argc >= MAX_ARGS)
2186 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
2191 sprintf(var, "$%d", cmd->argc);
2192 p = replaceVariable(&sql, p, eaten, var);
2194 cmd->argv[cmd->argc] = name;
2202 void pg_attribute_noreturn()
2203 syntax_error(const char *source, const int lineno,
2204 const char *line, const char *command,
2205 const char *msg, const char *more, const int column)
2207 fprintf(stderr, "%s:%d: %s", source, lineno, msg);
2209 fprintf(stderr, " (%s)", more);
2211 fprintf(stderr, " at column %d", column);
2212 fprintf(stderr, " in command \"%s\"\n", command);
2215 fprintf(stderr, "%s\n", line);
2220 for (i = 0; i < column - 1; i++)
2221 fprintf(stderr, " ");
2222 fprintf(stderr, "^ error found here\n");
2228 /* Parse a command; return a Command struct, or NULL if it's a comment */
2230 process_commands(char *buf, const char *source, const int lineno)
2232 const char delim[] = " \f\n\r\t\v";
2234 Command *my_commands;
2239 /* Make the string buf end at the next newline */
2240 if ((p = strchr(buf, '\n')) != NULL)
2243 /* Skip leading whitespace */
2245 while (isspace((unsigned char) *p))
2248 /* If the line is empty or actually a comment, we're done */
2249 if (*p == '\0' || strncmp(p, "--", 2) == 0)
2252 /* Allocate and initialize Command structure */
2253 my_commands = (Command *) pg_malloc(sizeof(Command));
2254 my_commands->line = pg_strdup(buf);
2255 my_commands->command_num = num_commands++;
2256 my_commands->type = 0; /* until set */
2257 my_commands->argc = 0;
2263 my_commands->type = META_COMMAND;
2266 tok = strtok(++p, delim);
2268 if (tok != NULL && pg_strcasecmp(tok, "set") == 0)
2273 my_commands->cols[j] = tok - buf + 1;
2274 my_commands->argv[j++] = pg_strdup(tok);
2275 my_commands->argc++;
2276 if (max_args >= 0 && my_commands->argc >= max_args)
2277 tok = strtok(NULL, "");
2279 tok = strtok(NULL, delim);
2282 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
2285 * parsing: \setrandom variable min max [uniform] \setrandom
2286 * variable min max (gaussian|exponential) threshold
2289 if (my_commands->argc < 4)
2291 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2292 "missing arguments", NULL, -1);
2297 if (my_commands->argc == 4 || /* uniform without/with
2298 * "uniform" keyword */
2299 (my_commands->argc == 5 &&
2300 pg_strcasecmp(my_commands->argv[4], "uniform") == 0))
2304 else if ( /* argc >= 5 */
2305 (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) ||
2306 (pg_strcasecmp(my_commands->argv[4], "exponential") == 0))
2308 if (my_commands->argc < 6)
2310 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2311 "missing threshold argument", my_commands->argv[4], -1);
2313 else if (my_commands->argc > 6)
2315 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2316 "too many arguments", my_commands->argv[4],
2317 my_commands->cols[6]);
2320 else /* cannot parse, unexpected arguments */
2322 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2323 "unexpected argument", my_commands->argv[4],
2324 my_commands->cols[4]);
2327 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
2329 if (my_commands->argc < 3)
2331 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2332 "missing argument", NULL, -1);
2335 expr_scanner_init(my_commands->argv[2], source, lineno,
2336 my_commands->line, my_commands->argv[0],
2337 my_commands->cols[2] - 1);
2339 if (expr_yyparse() != 0)
2341 /* dead code: exit done from syntax_error called by yyerror */
2345 my_commands->expr = expr_parse_result;
2347 expr_scanner_finish();
2349 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
2351 if (my_commands->argc < 2)
2353 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2354 "missing argument", NULL, -1);
2358 * Split argument into number and unit to allow "sleep 1ms" etc.
2359 * We don't have to terminate the number argument with null
2360 * because it will be parsed with atoi, which ignores trailing
2361 * non-digit characters.
2363 if (my_commands->argv[1][0] != ':')
2365 char *c = my_commands->argv[1];
2367 while (isdigit((unsigned char) *c))
2371 my_commands->argv[2] = c;
2372 if (my_commands->argc < 3)
2373 my_commands->argc = 3;
2377 if (my_commands->argc >= 3)
2379 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
2380 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
2381 pg_strcasecmp(my_commands->argv[2], "s") != 0)
2383 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2384 "unknown time unit, must be us, ms or s",
2385 my_commands->argv[2], my_commands->cols[2]);
2389 /* this should be an error?! */
2390 for (j = 3; j < my_commands->argc; j++)
2391 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
2392 my_commands->argv[0], my_commands->argv[j]);
2394 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
2396 if (my_commands->argc < 3)
2398 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2399 "missing argument", NULL, -1);
2402 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
2404 if (my_commands->argc < 1)
2406 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2407 "missing command", NULL, -1);
2412 syntax_error(source, lineno, my_commands->line, my_commands->argv[0],
2413 "invalid command", NULL, -1);
2418 my_commands->type = SQL_COMMAND;
2423 my_commands->argv[0] = pg_strdup(p);
2424 my_commands->argc++;
2426 case QUERY_EXTENDED:
2427 case QUERY_PREPARED:
2428 if (!parseQuery(my_commands, p))
2440 * Read a line from fd, and return it in a malloc'd buffer.
2441 * Return NULL at EOF.
2443 * The buffer will typically be larger than necessary, but we don't care
2444 * in this program, because we'll free it as soon as we've parsed the line.
2447 read_line_from_file(FILE *fd)
2449 char tmpbuf[BUFSIZ];
2451 size_t buflen = BUFSIZ;
2454 buf = (char *) palloc(buflen);
2457 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2459 size_t thislen = strlen(tmpbuf);
2461 /* Append tmpbuf to whatever we had already */
2462 memcpy(buf + used, tmpbuf, thislen + 1);
2465 /* Done if we collected a newline */
2466 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2469 /* Else, enlarge buf to ensure we can append next bufferload */
2471 buf = (char *) pg_realloc(buf, buflen);
2483 process_file(char *filename)
2485 #define COMMANDS_ALLOC_NUM 128
2487 Command **my_commands;
2494 if (num_files >= MAX_FILES)
2496 fprintf(stderr, "at most %d SQL files are allowed\n", MAX_FILES);
2500 alloc_num = COMMANDS_ALLOC_NUM;
2501 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2503 if (strcmp(filename, "-") == 0)
2505 else if ((fd = fopen(filename, "r")) == NULL)
2507 fprintf(stderr, "could not open file \"%s\": %s\n",
2508 filename, strerror(errno));
2509 pg_free(my_commands);
2516 while ((buf = read_line_from_file(fd)) != NULL)
2522 command = process_commands(buf, filename, lineno);
2526 if (command == NULL)
2529 my_commands[index] = command;
2532 if (index >= alloc_num)
2534 alloc_num += COMMANDS_ALLOC_NUM;
2535 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2540 my_commands[index] = NULL;
2542 sql_files[num_files++] = my_commands;
2548 process_builtin(char *tb, const char *source)
2550 #define COMMANDS_ALLOC_NUM 128
2552 Command **my_commands;
2558 alloc_num = COMMANDS_ALLOC_NUM;
2559 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2570 while (*tb && *tb != '\n')
2583 command = process_commands(buf, source, lineno);
2584 if (command == NULL)
2587 my_commands[index] = command;
2590 if (index >= alloc_num)
2592 alloc_num += COMMANDS_ALLOC_NUM;
2593 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2597 my_commands[index] = NULL;
2602 /* print out results */
2604 printResults(int ttype, int64 normal_xacts, int nclients,
2605 TState *threads, int nthreads,
2606 instr_time total_time, instr_time conn_total_time,
2607 int64 total_latencies, int64 total_sqlats,
2608 int64 throttle_lag, int64 throttle_lag_max,
2609 int64 throttle_latency_skipped, int64 latency_late)
2611 double time_include,
2616 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2617 tps_include = normal_xacts / time_include;
2618 tps_exclude = normal_xacts / (time_include -
2619 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
2622 s = "TPC-B (sort of)";
2623 else if (ttype == 2)
2624 s = "Update only pgbench_accounts";
2625 else if (ttype == 1)
2630 printf("transaction type: %s\n", s);
2631 printf("scaling factor: %d\n", scale);
2632 printf("query mode: %s\n", QUERYMODE[querymode]);
2633 printf("number of clients: %d\n", nclients);
2634 printf("number of threads: %d\n", nthreads);
2637 printf("number of transactions per client: %d\n", nxacts);
2638 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2639 normal_xacts, (int64) nxacts * nclients);
2643 printf("duration: %d s\n", duration);
2644 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2648 /* Remaining stats are nonsensical if we failed to execute any xacts */
2649 if (normal_xacts <= 0)
2652 if (throttle_delay && latency_limit)
2653 printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
2654 throttle_latency_skipped,
2655 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
2658 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
2659 latency_limit / 1000.0, latency_late,
2660 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
2662 if (throttle_delay || progress || latency_limit)
2664 /* compute and show latency average and standard deviation */
2665 double latency = 0.001 * total_latencies / normal_xacts;
2666 double sqlat = (double) total_sqlats / normal_xacts;
2668 printf("latency average: %.3f ms\n"
2669 "latency stddev: %.3f ms\n",
2670 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2674 /* only an average latency computed from the duration is available */
2675 printf("latency average: %.3f ms\n",
2676 1000.0 * duration * nclients / normal_xacts);
2682 * Report average transaction lag under rate limit throttling. This
2683 * is the delay between scheduled and actual start times for the
2684 * transaction. The measured lag may be caused by thread/client load,
2685 * the database load, or the Poisson throttling process.
2687 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2688 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2691 printf("tps = %f (including connections establishing)\n", tps_include);
2692 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2694 /* Report per-command latencies */
2699 for (i = 0; i < num_files; i++)
2704 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2706 printf("statement latencies in milliseconds:\n");
2708 for (commands = sql_files[i]; *commands != NULL; commands++)
2710 Command *command = *commands;
2711 int cnum = command->command_num;
2713 instr_time total_exec_elapsed;
2714 int total_exec_count;
2717 /* Accumulate per-thread data for command */
2718 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2719 total_exec_count = 0;
2720 for (t = 0; t < nthreads; t++)
2722 TState *thread = &threads[t];
2724 INSTR_TIME_ADD(total_exec_elapsed,
2725 thread->exec_elapsed[cnum]);
2726 total_exec_count += thread->exec_count[cnum];
2729 if (total_exec_count > 0)
2730 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2734 printf("\t%f\t%s\n", total_time, command->line);
2742 main(int argc, char **argv)
2744 static struct option long_options[] = {
2745 /* systematic long/short named options */
2746 {"client", required_argument, NULL, 'c'},
2747 {"connect", no_argument, NULL, 'C'},
2748 {"debug", no_argument, NULL, 'd'},
2749 {"define", required_argument, NULL, 'D'},
2750 {"file", required_argument, NULL, 'f'},
2751 {"fillfactor", required_argument, NULL, 'F'},
2752 {"host", required_argument, NULL, 'h'},
2753 {"initialize", no_argument, NULL, 'i'},
2754 {"jobs", required_argument, NULL, 'j'},
2755 {"log", no_argument, NULL, 'l'},
2756 {"no-vacuum", no_argument, NULL, 'n'},
2757 {"port", required_argument, NULL, 'p'},
2758 {"progress", required_argument, NULL, 'P'},
2759 {"protocol", required_argument, NULL, 'M'},
2760 {"quiet", no_argument, NULL, 'q'},
2761 {"report-latencies", no_argument, NULL, 'r'},
2762 {"scale", required_argument, NULL, 's'},
2763 {"select-only", no_argument, NULL, 'S'},
2764 {"skip-some-updates", no_argument, NULL, 'N'},
2765 {"time", required_argument, NULL, 'T'},
2766 {"transactions", required_argument, NULL, 't'},
2767 {"username", required_argument, NULL, 'U'},
2768 {"vacuum-all", no_argument, NULL, 'v'},
2769 /* long-named only options */
2770 {"foreign-keys", no_argument, &foreign_keys, 1},
2771 {"index-tablespace", required_argument, NULL, 3},
2772 {"tablespace", required_argument, NULL, 2},
2773 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2774 {"sampling-rate", required_argument, NULL, 4},
2775 {"aggregate-interval", required_argument, NULL, 5},
2776 {"rate", required_argument, NULL, 'R'},
2777 {"latency-limit", required_argument, NULL, 'L'},
2778 {"progress-timestamp", no_argument, NULL, 6},
2783 int nclients = 1; /* default number of simulated clients */
2784 int nthreads = 1; /* default number of threads */
2785 int is_init_mode = 0; /* initialize mode? */
2786 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2787 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2788 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2789 * 2: skip update of branches and tellers */
2791 char *filename = NULL;
2792 bool scale_given = false;
2794 bool benchmarking_option_set = false;
2795 bool initialization_option_set = false;
2797 CState *state; /* status of clients */
2798 TState *threads; /* array of thread */
2800 instr_time start_time; /* start up time */
2801 instr_time total_time;
2802 instr_time conn_total_time;
2803 int64 total_xacts = 0;
2804 int64 total_latencies = 0;
2805 int64 total_sqlats = 0;
2806 int64 throttle_lag = 0;
2807 int64 throttle_lag_max = 0;
2808 int64 throttle_latency_skipped = 0;
2809 int64 latency_late = 0;
2814 #ifdef HAVE_GETRLIMIT
2824 progname = get_progname(argv[0]);
2828 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2833 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2835 puts("pgbench (PostgreSQL) " PG_VERSION);
2841 /* stderr is buffered on Win32. */
2842 setvbuf(stderr, NULL, _IONBF, 0);
2845 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2847 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2849 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2852 state = (CState *) pg_malloc(sizeof(CState));
2853 memset(state, 0, sizeof(CState));
2855 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
2863 pghost = pg_strdup(optarg);
2869 do_vacuum_accounts++;
2872 pgport = pg_strdup(optarg);
2879 benchmarking_option_set = true;
2883 benchmarking_option_set = true;
2886 benchmarking_option_set = true;
2887 nclients = atoi(optarg);
2888 if (nclients <= 0 || nclients > MAXCLIENTS)
2890 fprintf(stderr, "invalid number of clients: \"%s\"\n",
2894 #ifdef HAVE_GETRLIMIT
2895 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2896 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2897 #else /* but BSD doesn't ... */
2898 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2899 #endif /* RLIMIT_NOFILE */
2901 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2904 if (rlim.rlim_cur < nclients + 3)
2906 fprintf(stderr, "need at least %d open files, but system limit is %ld\n",
2907 nclients + 3, (long) rlim.rlim_cur);
2908 fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n");
2911 #endif /* HAVE_GETRLIMIT */
2913 case 'j': /* jobs */
2914 benchmarking_option_set = true;
2915 nthreads = atoi(optarg);
2918 fprintf(stderr, "invalid number of threads: \"%s\"\n",
2922 #ifndef ENABLE_THREAD_SAFETY
2925 fprintf(stderr, "threads are not supported on this platform; use -j1\n");
2928 #endif /* !ENABLE_THREAD_SAFETY */
2931 benchmarking_option_set = true;
2935 benchmarking_option_set = true;
2936 is_latencies = true;
2940 scale = atoi(optarg);
2943 fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg);
2948 benchmarking_option_set = true;
2951 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2954 nxacts = atoi(optarg);
2957 fprintf(stderr, "invalid number of transactions: \"%s\"\n",
2963 benchmarking_option_set = true;
2966 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n");
2969 duration = atoi(optarg);
2972 fprintf(stderr, "invalid duration: \"%s\"\n", optarg);
2977 login = pg_strdup(optarg);
2980 benchmarking_option_set = true;
2984 initialization_option_set = true;
2988 benchmarking_option_set = true;
2990 filename = pg_strdup(optarg);
2991 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2998 benchmarking_option_set = true;
3000 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
3002 fprintf(stderr, "invalid variable definition: \"%s\"\n",
3008 if (!putVariable(&state[0], "option", optarg, p))
3013 initialization_option_set = true;
3014 fillfactor = atoi(optarg);
3015 if (fillfactor < 10 || fillfactor > 100)
3017 fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg);
3022 benchmarking_option_set = true;
3025 fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f)\n");
3028 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
3029 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
3031 if (querymode >= NUM_QUERYMODE)
3033 fprintf(stderr, "invalid query mode (-M): \"%s\"\n",
3039 benchmarking_option_set = true;
3040 progress = atoi(optarg);
3043 fprintf(stderr, "invalid thread progress delay: \"%s\"\n",
3050 /* get a double from the beginning of option value */
3051 double throttle_value = atof(optarg);
3053 benchmarking_option_set = true;
3055 if (throttle_value <= 0.0)
3057 fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg);
3060 /* Invert rate limit into a time offset */
3061 throttle_delay = (int64) (1000000.0 / throttle_value);
3066 double limit_ms = atof(optarg);
3068 if (limit_ms <= 0.0)
3070 fprintf(stderr, "invalid latency limit: \"%s\"\n",
3074 benchmarking_option_set = true;
3075 latency_limit = (int64) (limit_ms * 1000);
3079 /* This covers long options which take no argument. */
3080 if (foreign_keys || unlogged_tables)
3081 initialization_option_set = true;
3083 case 2: /* tablespace */
3084 initialization_option_set = true;
3085 tablespace = pg_strdup(optarg);
3087 case 3: /* index-tablespace */
3088 initialization_option_set = true;
3089 index_tablespace = pg_strdup(optarg);
3092 benchmarking_option_set = true;
3093 sample_rate = atof(optarg);
3094 if (sample_rate <= 0.0 || sample_rate > 1.0)
3096 fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg);
3102 fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
3105 benchmarking_option_set = true;
3106 agg_interval = atoi(optarg);
3107 if (agg_interval <= 0)
3109 fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n",
3116 progress_timestamp = true;
3117 benchmarking_option_set = true;
3120 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
3127 * Don't need more threads than there are clients. (This is not merely an
3128 * optimization; throttle_delay is calculated incorrectly below if some
3129 * threads have no clients assigned to them.)
3131 if (nthreads > nclients)
3132 nthreads = nclients;
3134 /* compute a per thread delay */
3135 throttle_delay *= nthreads;
3138 dbName = argv[optind];
3141 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
3143 else if (login != NULL && *login != '\0')
3151 if (benchmarking_option_set)
3153 fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n");
3162 if (initialization_option_set)
3164 fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n");
3169 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
3170 if (nxacts <= 0 && duration <= 0)
3171 nxacts = DEFAULT_NXACTS;
3173 /* --sampling-rate may be used only with -l */
3174 if (sample_rate > 0.0 && !use_log)
3176 fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n");
3180 /* --sampling-rate may must not be used with --aggregate-interval */
3181 if (sample_rate > 0.0 && agg_interval > 0)
3183 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n");
3187 if (agg_interval > 0 && !use_log)
3189 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
3193 if (duration > 0 && agg_interval > duration)
3195 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
3199 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
3201 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
3206 * save main process id in the global variable because process id will be
3207 * changed after fork.
3209 main_pid = (int) getpid();
3210 progress_nclients = nclients;
3211 progress_nthreads = nthreads;
3215 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
3216 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
3218 /* copy any -D switch values to all clients */
3219 for (i = 1; i < nclients; i++)
3224 for (j = 0; j < state[0].nvariables; j++)
3226 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
3235 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
3236 pghost, pgport, nclients, nxacts, dbName);
3238 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
3239 pghost, pgport, nclients, duration, dbName);
3242 /* opening connection... */
3247 if (PQstatus(con) == CONNECTION_BAD)
3249 fprintf(stderr, "connection to database \"%s\" failed\n", dbName);
3250 fprintf(stderr, "%s", PQerrorMessage(con));
3257 * get the scaling factor that should be same as count(*) from
3258 * pgbench_branches if this is not a custom query
3260 res = PQexec(con, "select count(*) from pgbench_branches");
3261 if (PQresultStatus(res) != PGRES_TUPLES_OK)
3263 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
3265 fprintf(stderr, "%s", PQerrorMessage(con));
3266 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
3268 fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con));
3273 scale = atoi(PQgetvalue(res, 0, 0));
3276 fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n",
3277 PQgetvalue(res, 0, 0));
3282 /* warn if we override user-given -s switch */
3285 "scale option ignored, using count from pgbench_branches table (%d)\n",
3290 * :scale variables normally get -s or database scale, but don't override
3291 * an explicit -D switch
3293 if (getVariable(&state[0], "scale") == NULL)
3295 snprintf(val, sizeof(val), "%d", scale);
3296 for (i = 0; i < nclients; i++)
3298 if (!putVariable(&state[i], "startup", "scale", val))
3304 * Define a :client_id variable that is unique per connection. But don't
3305 * override an explicit -D switch.
3307 if (getVariable(&state[0], "client_id") == NULL)
3309 for (i = 0; i < nclients; i++)
3311 snprintf(val, sizeof(val), "%d", i);
3312 if (!putVariable(&state[i], "startup", "client_id", val))
3319 fprintf(stderr, "starting vacuum...");
3320 tryExecuteStatement(con, "vacuum pgbench_branches");
3321 tryExecuteStatement(con, "vacuum pgbench_tellers");
3322 tryExecuteStatement(con, "truncate pgbench_history");
3323 fprintf(stderr, "end.\n");
3325 if (do_vacuum_accounts)
3327 fprintf(stderr, "starting vacuum pgbench_accounts...");
3328 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
3329 fprintf(stderr, "end.\n");
3334 /* set random seed */
3335 INSTR_TIME_SET_CURRENT(start_time);
3336 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
3338 /* process builtin SQL scripts */
3342 sql_files[0] = process_builtin(tpc_b,
3343 "<builtin: TPC-B (sort of)>");
3348 sql_files[0] = process_builtin(select_only,
3349 "<builtin: select only>");
3354 sql_files[0] = process_builtin(simple_update,
3355 "<builtin: simple update>");
3363 /* set up thread data structures */
3364 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
3367 for (i = 0; i < nthreads; i++)
3369 TState *thread = &threads[i];
3372 thread->state = &state[nclients_dealt];
3374 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
3375 thread->random_state[0] = random();
3376 thread->random_state[1] = random();
3377 thread->random_state[2] = random();
3378 thread->throttle_latency_skipped = 0;
3379 thread->latency_late = 0;
3381 nclients_dealt += thread->nstate;
3385 /* Reserve memory for the thread to store per-command latencies */
3388 thread->exec_elapsed = (instr_time *)
3389 pg_malloc(sizeof(instr_time) * num_commands);
3390 thread->exec_count = (int *)
3391 pg_malloc(sizeof(int) * num_commands);
3393 for (t = 0; t < num_commands; t++)
3395 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
3396 thread->exec_count[t] = 0;
3401 thread->exec_elapsed = NULL;
3402 thread->exec_count = NULL;
3406 /* all clients must be assigned to a thread */
3407 Assert(nclients_dealt == nclients);
3409 /* get start up time */
3410 INSTR_TIME_SET_CURRENT(start_time);
3412 /* set alarm if duration is specified. */
3417 #ifdef ENABLE_THREAD_SAFETY
3418 for (i = 0; i < nthreads; i++)
3420 TState *thread = &threads[i];
3422 INSTR_TIME_SET_CURRENT(thread->start_time);
3424 /* the first thread (i = 0) is executed by main thread */
3427 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
3429 if (err != 0 || thread->thread == INVALID_THREAD)
3431 fprintf(stderr, "could not create thread: %s\n", strerror(err));
3437 thread->thread = INVALID_THREAD;
3441 INSTR_TIME_SET_CURRENT(threads[0].start_time);
3442 threads[0].thread = INVALID_THREAD;
3443 #endif /* ENABLE_THREAD_SAFETY */
3445 /* wait for threads and accumulate results */
3446 INSTR_TIME_SET_ZERO(conn_total_time);
3447 for (i = 0; i < nthreads; i++)
3449 TState *thread = &threads[i];
3452 #ifdef ENABLE_THREAD_SAFETY
3453 if (threads[i].thread == INVALID_THREAD)
3454 /* actually run this thread directly in the main thread */
3455 (void) threadRun(thread);
3457 /* wait of other threads. should check that 0 is returned? */
3458 pthread_join(thread->thread, NULL);
3460 (void) threadRun(thread);
3461 #endif /* ENABLE_THREAD_SAFETY */
3463 /* thread level stats */
3464 throttle_lag += thread->throttle_lag;
3465 throttle_latency_skipped += threads->throttle_latency_skipped;
3466 latency_late += thread->latency_late;
3467 if (throttle_lag_max > thread->throttle_lag_max)
3468 throttle_lag_max = thread->throttle_lag_max;
3469 INSTR_TIME_ADD(conn_total_time, thread->conn_time);
3471 /* client-level stats */
3472 for (j = 0; j < thread->nstate; j++)
3474 total_xacts += thread->state[j].cnt;
3475 total_latencies += thread->state[j].txn_latencies;
3476 total_sqlats += thread->state[j].txn_sqlats;
3479 disconnect_all(state, nclients);
3482 * XXX We compute results as though every client of every thread started
3483 * and finished at the same time. That model can diverge noticeably from
3484 * reality for a short benchmark run involving relatively many threads.
3485 * The first thread may process notably many transactions before the last
3486 * thread begins. Improving the model alone would bring limited benefit,
3487 * because performance during those periods of partial thread count can
3488 * easily exceed steady state performance. This is one of the many ways
3489 * short runs convey deceptive performance figures.
3491 INSTR_TIME_SET_CURRENT(total_time);
3492 INSTR_TIME_SUBTRACT(total_time, start_time);
3493 printResults(ttype, total_xacts, nclients, threads, nthreads,
3494 total_time, conn_total_time, total_latencies, total_sqlats,
3495 throttle_lag, throttle_lag_max, throttle_latency_skipped,
3502 threadRun(void *arg)
3504 TState *thread = (TState *) arg;
3505 CState *state = thread->state;
3506 FILE *logfile = NULL; /* per-thread log file */
3509 int nstate = thread->nstate;
3510 int remains = nstate; /* number of remaining clients */
3513 /* for reporting progress: */
3514 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
3515 int64 last_report = thread_start;
3516 int64 next_report = last_report + (int64) progress * 1000000;
3517 int64 last_count = 0,
3526 * Initialize throttling rate target for all of the thread's clients. It
3527 * might be a little more accurate to reset thread->start_time here too.
3528 * The possible drift seems too small relative to typical throttle delay
3529 * times to worry about it.
3531 INSTR_TIME_SET_CURRENT(start);
3532 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3533 thread->throttle_lag = 0;
3534 thread->throttle_lag_max = 0;
3536 INSTR_TIME_SET_ZERO(thread->conn_time);
3538 /* open log file if requested */
3543 if (thread->tid == 0)
3544 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3546 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3547 logfile = fopen(logpath, "w");
3549 if (logfile == NULL)
3551 fprintf(stderr, "could not open logfile \"%s\": %s\n",
3552 logpath, strerror(errno));
3559 /* make connections to the database */
3560 for (i = 0; i < nstate; i++)
3562 if ((state[i].con = doConnect()) == NULL)
3567 /* time after thread and connections set up */
3568 INSTR_TIME_SET_CURRENT(thread->conn_time);
3569 INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
3571 agg_vals_init(&aggs, thread->start_time);
3573 /* send start up queries in async manner */
3574 for (i = 0; i < nstate; i++)
3576 CState *st = &state[i];
3577 Command **commands = sql_files[st->use_file];
3578 int prev_ecnt = st->ecnt;
3580 st->use_file = getrand(thread, 0, num_files - 1);
3581 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3582 remains--; /* I've aborted */
3584 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3586 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3588 remains--; /* I've aborted */
3597 int maxsock; /* max socket number to be waited */
3601 FD_ZERO(&input_mask);
3604 min_usec = PG_INT64_MAX;
3605 for (i = 0; i < nstate; i++)
3607 CState *st = &state[i];
3608 Command **commands = sql_files[st->use_file];
3611 if (st->con == NULL)
3615 else if (st->sleeping)
3617 if (st->throttling && timer_exceeded)
3619 /* interrupt client which has not started a transaction */
3622 st->throttling = false;
3627 else /* just a nap from the script */
3631 if (min_usec == PG_INT64_MAX)
3635 INSTR_TIME_SET_CURRENT(now);
3636 now_usec = INSTR_TIME_GET_MICROSEC(now);
3639 this_usec = st->txn_scheduled - now_usec;
3640 if (min_usec > this_usec)
3641 min_usec = this_usec;
3644 else if (commands[st->state]->type == META_COMMAND)
3646 min_usec = 0; /* the connection is ready to run */
3650 sock = PQsocket(st->con);
3653 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3657 FD_SET(sock, &input_mask);
3663 /* also wake up to print the next progress report on time */
3664 if (progress && min_usec > 0 && thread->tid == 0)
3666 /* get current time if needed */
3671 INSTR_TIME_SET_CURRENT(now);
3672 now_usec = INSTR_TIME_GET_MICROSEC(now);
3675 if (now_usec >= next_report)
3677 else if ((next_report - now_usec) < min_usec)
3678 min_usec = next_report - now_usec;
3682 * Sleep until we receive data from the server, or a nap-time
3683 * specified in the script ends, or it's time to print a progress
3686 if (min_usec > 0 && maxsock != -1)
3688 int nsocks; /* return from select(2) */
3690 if (min_usec != PG_INT64_MAX)
3692 struct timeval timeout;
3694 timeout.tv_sec = min_usec / 1000000;
3695 timeout.tv_usec = min_usec % 1000000;
3696 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3699 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3704 /* must be something wrong */
3705 fprintf(stderr, "select() failed: %s\n", strerror(errno));
3710 /* ok, backend returns reply */
3711 for (i = 0; i < nstate; i++)
3713 CState *st = &state[i];
3714 Command **commands = sql_files[st->use_file];
3715 int prev_ecnt = st->ecnt;
3717 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3718 || commands[st->state]->type == META_COMMAND))
3720 if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
3721 remains--; /* I've aborted */
3724 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3726 fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
3728 remains--; /* I've aborted */
3734 /* progress report by thread 0 for all threads */
3735 if (progress && thread->tid == 0)
3737 instr_time now_time;
3740 INSTR_TIME_SET_CURRENT(now_time);
3741 now = INSTR_TIME_GET_MICROSEC(now_time);
3742 if (now >= next_report)
3744 /* generate and show report */
3750 int64 run = now - last_report;
3760 * Add up the statistics of all threads.
3762 * XXX: No locking. There is no guarantee that we get an
3763 * atomic snapshot of the transaction count and latencies, so
3764 * these figures can well be off by a small amount. The
3765 * progress is report's purpose is to give a quick overview of
3766 * how the test is going, so that shouldn't matter too much.
3767 * (If a read from a 64-bit integer is not atomic, you might
3768 * get a "torn" read and completely bogus latencies though!)
3770 for (i = 0; i < progress_nclients; i++)
3772 count += state[i].cnt;
3773 lats += state[i].txn_latencies;
3774 sqlats += state[i].txn_sqlats;
3777 for (i = 0; i < progress_nthreads; i++)
3779 skipped += thread[i].throttle_latency_skipped;
3780 lags += thread[i].throttle_lag;
3783 total_run = (now - thread_start) / 1000000.0;
3784 tps = 1000000.0 * (count - last_count) / run;
3785 latency = 0.001 * (lats - last_lats) / (count - last_count);
3786 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3787 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3788 lag = 0.001 * (lags - last_lags) / (count - last_count);
3790 if (progress_timestamp)
3791 sprintf(tbuf, "%.03f s",
3792 INSTR_TIME_GET_MILLISEC(now_time) / 1000.0);
3794 sprintf(tbuf, "%.1f s", total_run);
3797 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
3798 tbuf, tps, latency, stdev);
3802 fprintf(stderr, ", lag %.3f ms", lag);
3804 fprintf(stderr, ", " INT64_FORMAT " skipped",
3805 skipped - last_skipped);
3807 fprintf(stderr, "\n");
3811 last_sqlats = sqlats;
3814 last_skipped = skipped;
3817 * Ensure that the next report is in the future, in case
3818 * pgbench/postgres got stuck somewhere.
3822 next_report += (int64) progress *1000000;
3823 } while (now >= next_report);
3829 INSTR_TIME_SET_CURRENT(start);
3830 disconnect_all(state, nstate);
3831 INSTR_TIME_SET_CURRENT(end);
3832 INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
3839 * Support for duration option: set timer_exceeded after so many seconds.
3845 handle_sig_alarm(SIGNAL_ARGS)
3847 timer_exceeded = true;
3851 setalarm(int seconds)
3853 pqsignal(SIGALRM, handle_sig_alarm);
3859 static VOID CALLBACK
3860 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3862 timer_exceeded = true;
3866 setalarm(int seconds)
3871 /* This function will be called at most once, so we can cheat a bit. */
3872 queue = CreateTimerQueue();
3873 if (seconds > ((DWORD) -1) / 1000 ||
3874 !CreateTimerQueueTimer(&timer, queue,
3875 win32_timer_callback, NULL, seconds * 1000, 0,
3876 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3878 fprintf(stderr, "failed to set timer\n");
3883 /* partial pthread implementation for Windows */
3885 typedef struct win32_pthread
3888 void *(*routine) (void *);
3893 static unsigned __stdcall
3894 win32_pthread_run(void *arg)
3896 win32_pthread *th = (win32_pthread *) arg;
3898 th->result = th->routine(th->arg);
3904 pthread_create(pthread_t *thread,
3905 pthread_attr_t *attr,
3906 void *(*start_routine) (void *),
3912 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3913 th->routine = start_routine;
3917 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3918 if (th->handle == NULL)
3930 pthread_join(pthread_t th, void **thread_return)
3932 if (th == NULL || th->handle == NULL)
3933 return errno = EINVAL;
3935 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3937 _dosmaperr(GetLastError());
3942 *thread_return = th->result;
3944 CloseHandle(th->handle);