4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * contrib/pgbench/pgbench.c
8 * Copyright (c) 2000-2014, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
44 #ifdef HAVE_SYS_SELECT_H
45 #include <sys/select.h>
48 #ifdef HAVE_SYS_RESOURCE_H
49 #include <sys/resource.h> /* for getrlimit */
53 #define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
57 * Multi-platform pthread implementations
61 /* Use native win32 threads on Windows */
62 typedef struct win32_pthread *pthread_t;
63 typedef int pthread_attr_t;
65 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
66 static int pthread_join(pthread_t th, void **thread_return);
67 #elif defined(ENABLE_THREAD_SAFETY)
68 /* Use platform-dependent pthread capability */
71 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
72 #define PTHREAD_FORK_EMULATION
75 #define pthread_t pg_pthread_t
76 #define pthread_attr_t pg_pthread_attr_t
77 #define pthread_create pg_pthread_create
78 #define pthread_join pg_pthread_join
80 typedef struct fork_pthread *pthread_t;
81 typedef int pthread_attr_t;
83 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
84 static int pthread_join(pthread_t th, void **thread_return);
88 /********************************************************************
89 * some configurable parameters */
91 /* max number of clients allowed */
93 #define MAXCLIENTS (FD_SETSIZE - 10)
95 #define MAXCLIENTS 1024
98 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
99 #define DEFAULT_NXACTS 10 /* default nxacts */
101 int nxacts = 0; /* number of transactions per client */
102 int duration = 0; /* duration in seconds */
105 * scaling factor. for example, scale = 10 will make 1000000 tuples in
106 * pgbench_accounts table.
111 * fillfactor. for example, fillfactor = 90 will use only 90 percent
112 * space during inserts and leave 10 percent free.
114 int fillfactor = 100;
117 * create foreign key constraints on the tables?
119 int foreign_keys = 0;
122 * use unlogged tables?
124 int unlogged_tables = 0;
127 * log sampling rate (1.0 = log everything, 0.0 = option not given)
129 double sample_rate = 0.0;
132 * When threads are throttled to a given rate limit, this is the target delay
133 * to reach that rate in usec. 0 is the default and means no throttling.
135 int64 throttle_delay = 0;
138 * tablespace selection
140 char *tablespace = NULL;
141 char *index_tablespace = NULL;
144 * end of configurable parameters
145 *********************************************************************/
147 #define nbranches 1 /* Makes little sense to change this. Change
150 #define naccounts 100000
153 * The scale factor at/beyond which 32bit integers are incapable of storing
156 * Although the actual threshold is 21474, we use 20000 because it is easier to
157 * document and remember, and isn't that far away from the real threshold.
159 #define SCALE_32BIT_THRESHOLD 20000
161 bool use_log; /* log transaction latencies to a file */
162 bool use_quiet; /* quiet logging onto stderr */
163 int agg_interval; /* log aggregates instead of individual
165 int progress = 0; /* thread progress report every this seconds */
166 int progress_nclients = 0; /* number of clients for progress
168 int progress_nthreads = 0; /* number of threads for progress
170 bool is_connect; /* establish connection for each transaction */
171 bool is_latencies; /* report per-command latencies */
172 int main_pid; /* main process id used in log filename */
178 const char *progname;
180 volatile bool timer_exceeded = false; /* flag from signal handler */
182 /* variable definitions */
185 char *name; /* variable name */
186 char *value; /* its value */
189 #define MAX_FILES 128 /* max number of SQL script files allowed */
190 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
193 * structures used in custom query mode
198 PGconn *con; /* connection handle to DB */
199 int id; /* client No. */
200 int state; /* state No. */
201 int cnt; /* xacts count */
202 int ecnt; /* error count */
203 int listen; /* 0 indicates that an async query has been
205 int sleeping; /* 1 indicates that the client is napping */
206 bool throttling; /* whether nap is for throttling */
207 int64 until; /* napping until (usec) */
208 Variable *variables; /* array of variable definitions */
210 instr_time txn_begin; /* used for measuring transaction latencies */
211 instr_time stmt_begin; /* used for measuring statement latencies */
212 int64 txn_latencies; /* cumulated latencies */
213 int64 txn_sqlats; /* cumulated square latencies */
214 bool is_throttled; /* whether transaction throttling is done */
215 int use_file; /* index in sql_files for this client */
216 bool prepared[MAX_FILES];
220 * Thread state and result
224 int tid; /* thread id */
225 pthread_t thread; /* thread handle */
226 CState *state; /* array of CState */
227 int nstate; /* length of state[] */
228 instr_time start_time; /* thread start time */
229 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
230 int *exec_count; /* number of cmd executions (per Command) */
231 unsigned short random_state[3]; /* separate randomness for each thread */
232 int64 throttle_trigger; /* previous/next throttling (us) */
233 int64 throttle_lag; /* total transaction lag behind throttling */
234 int64 throttle_lag_max; /* max transaction lag */
237 #define INVALID_THREAD ((pthread_t) 0)
241 instr_time conn_time;
246 int64 throttle_lag_max;
250 * queries read from files
252 #define SQL_COMMAND 1
253 #define META_COMMAND 2
256 typedef enum QueryMode
258 QUERY_SIMPLE, /* simple query */
259 QUERY_EXTENDED, /* extended query */
260 QUERY_PREPARED, /* extended query with prepared statements */
264 static QueryMode querymode = QUERY_SIMPLE;
265 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
269 char *line; /* full text of command line */
270 int command_num; /* unique index of this Command struct */
271 int type; /* command type (SQL_COMMAND or META_COMMAND) */
272 int argc; /* number of command words */
273 char *argv[MAX_ARGS]; /* command word list */
279 long start_time; /* when does the interval start */
280 int cnt; /* number of transactions */
281 double min_duration; /* min/max durations */
283 double sum; /* sum(duration), sum(duration^2) - for
289 static Command **sql_files[MAX_FILES]; /* SQL script files */
290 static int num_files; /* number of script files */
291 static int num_commands = 0; /* total number of Command structs */
292 static int debug = 0; /* debug flag */
294 /* default scenario */
295 static char *tpc_b = {
296 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
297 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
298 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
299 "\\setrandom aid 1 :naccounts\n"
300 "\\setrandom bid 1 :nbranches\n"
301 "\\setrandom tid 1 :ntellers\n"
302 "\\setrandom delta -5000 5000\n"
304 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
305 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
306 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
307 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
308 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
313 static char *simple_update = {
314 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
315 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
316 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
317 "\\setrandom aid 1 :naccounts\n"
318 "\\setrandom bid 1 :nbranches\n"
319 "\\setrandom tid 1 :ntellers\n"
320 "\\setrandom delta -5000 5000\n"
322 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
323 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
324 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
329 static char *select_only = {
330 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
331 "\\setrandom aid 1 :naccounts\n"
332 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
335 /* Function prototypes */
336 static void setalarm(int seconds);
337 static void *threadRun(void *arg);
342 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
344 " %s [OPTION]... [DBNAME]\n"
345 "\nInitialization options:\n"
346 " -i, --initialize invokes initialization mode\n"
347 " -F, --fillfactor=NUM set fill factor\n"
348 " -n, --no-vacuum do not run VACUUM after initialization\n"
349 " -q, --quiet quiet logging (one message each 5 seconds)\n"
350 " -s, --scale=NUM scaling factor\n"
351 " --foreign-keys create foreign key constraints between tables\n"
352 " --index-tablespace=TABLESPACE\n"
353 " create indexes in the specified tablespace\n"
354 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
355 " --unlogged-tables create tables as unlogged tables\n"
356 "\nBenchmarking options:\n"
357 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
358 " -C, --connect establish new connection for each transaction\n"
359 " -D, --define=VARNAME=VALUE\n"
360 " define variable for use by custom script\n"
361 " -f, --file=FILENAME read transaction script from FILENAME\n"
362 " -j, --jobs=NUM number of threads (default: 1)\n"
363 " -l, --log write transaction times to log file\n"
364 " -M, --protocol=simple|extended|prepared\n"
365 " protocol for submitting queries (default: simple)\n"
366 " -n, --no-vacuum do not run VACUUM before tests\n"
367 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
368 " -P, --progress=NUM show thread progress report every NUM seconds\n"
369 " -r, --report-latencies report average latency per command\n"
370 " -R, --rate=NUM target rate in transactions per second\n"
371 " -s, --scale=NUM report this scale factor in output\n"
372 " -S, --select-only perform SELECT-only transactions\n"
373 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
374 " -T, --time=NUM duration of benchmark test in seconds\n"
375 " -v, --vacuum-all vacuum all four standard tables before tests\n"
376 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
377 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
378 "\nCommon options:\n"
379 " -d, --debug print debugging output\n"
380 " -h, --host=HOSTNAME database server host or socket directory\n"
381 " -p, --port=PORT database server port number\n"
382 " -U, --username=USERNAME connect as specified database user\n"
383 " -V, --version output version information, then exit\n"
384 " -?, --help show this help, then exit\n"
386 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
391 * strtoint64 -- convert a string to 64-bit integer
393 * This function is a modified version of scanint8() from
394 * src/backend/utils/adt/int8.c.
397 strtoint64(const char *str)
399 const char *ptr = str;
404 * Do our own scan, rather than relying on sscanf which might be broken
408 /* skip leading spaces */
409 while (*ptr && isspace((unsigned char) *ptr))
418 * Do an explicit check for INT64_MIN. Ugly though this is, it's
419 * cleaner than trying to get the loop below to handle it portably.
421 if (strncmp(ptr, "9223372036854775808", 19) == 0)
423 result = -INT64CONST(0x7fffffffffffffff) - 1;
429 else if (*ptr == '+')
432 /* require at least one digit */
433 if (!isdigit((unsigned char) *ptr))
434 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
437 while (*ptr && isdigit((unsigned char) *ptr))
439 int64 tmp = result * 10 + (*ptr++ - '0');
441 if ((tmp / 10) != result) /* overflow? */
442 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
448 /* allow trailing whitespace, but not other trailing chars */
449 while (*ptr != '\0' && isspace((unsigned char) *ptr))
453 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
455 return ((sign < 0) ? -result : result);
458 /* random number generator: uniform distribution from min to max inclusive */
460 getrand(TState *thread, int64 min, int64 max)
463 * Odd coding is so that min and max have approximately the same chance of
464 * being selected as do numbers between them.
466 * pg_erand48() is thread-safe and concurrent, which is why we use it
467 * rather than random(), which in glibc is non-reentrant, and therefore
468 * protected by a mutex, and therefore a bottleneck on machines with many
471 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
474 /* call PQexec() and exit() on failure */
476 executeStatement(PGconn *con, const char *sql)
480 res = PQexec(con, sql);
481 if (PQresultStatus(res) != PGRES_COMMAND_OK)
483 fprintf(stderr, "%s", PQerrorMessage(con));
489 /* set up a connection to the backend */
494 static char *password = NULL;
498 * Start the connection. Loop until we have a password if requested by
503 #define PARAMS_ARRAY_SIZE 7
505 const char *keywords[PARAMS_ARRAY_SIZE];
506 const char *values[PARAMS_ARRAY_SIZE];
508 keywords[0] = "host";
510 keywords[1] = "port";
512 keywords[2] = "user";
514 keywords[3] = "password";
515 values[3] = password;
516 keywords[4] = "dbname";
518 keywords[5] = "fallback_application_name";
519 values[5] = progname;
525 conn = PQconnectdbParams(keywords, values, true);
529 fprintf(stderr, "Connection to database \"%s\" failed\n",
534 if (PQstatus(conn) == CONNECTION_BAD &&
535 PQconnectionNeedsPassword(conn) &&
539 password = simple_prompt("Password: ", 100, false);
544 /* check to see that the backend connection was successfully made */
545 if (PQstatus(conn) == CONNECTION_BAD)
547 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
548 dbName, PQerrorMessage(conn));
556 /* throw away response from backend */
558 discard_response(CState *state)
564 res = PQgetResult(state->con);
571 compareVariables(const void *v1, const void *v2)
573 return strcmp(((const Variable *) v1)->name,
574 ((const Variable *) v2)->name);
578 getVariable(CState *st, char *name)
583 /* On some versions of Solaris, bsearch of zero items dumps core */
584 if (st->nvariables <= 0)
588 var = (Variable *) bsearch((void *) &key,
589 (void *) st->variables,
599 /* check whether the name consists of alphabets, numerals and underscores. */
601 isLegalVariableName(const char *name)
605 for (i = 0; name[i] != '\0'; i++)
607 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
615 putVariable(CState *st, const char *context, char *name, char *value)
621 /* On some versions of Solaris, bsearch of zero items dumps core */
622 if (st->nvariables > 0)
623 var = (Variable *) bsearch((void *) &key,
624 (void *) st->variables,
636 * Check for the name only when declaring a new variable to avoid
639 if (!isLegalVariableName(name))
641 fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
646 newvars = (Variable *) pg_realloc(st->variables,
647 (st->nvariables + 1) * sizeof(Variable));
649 newvars = (Variable *) pg_malloc(sizeof(Variable));
651 st->variables = newvars;
653 var = &newvars[st->nvariables];
655 var->name = pg_strdup(name);
656 var->value = pg_strdup(value);
660 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
667 /* dup then free, in case value is pointing at this variable */
668 val = pg_strdup(value);
678 parseVariable(const char *sql, int *eaten)
686 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
691 memcpy(name, &sql[1], i - 1);
699 replaceVariable(char **sql, char *param, int len, char *value)
701 int valueln = strlen(value);
705 size_t offset = param - *sql;
707 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
708 param = *sql + offset;
712 memmove(param + valueln, param + len, strlen(param + len) + 1);
713 strncpy(param, value, valueln);
715 return param + valueln;
719 assignVariables(CState *st, char *sql)
726 while ((p = strchr(p, ':')) != NULL)
730 name = parseVariable(p, &eaten);
740 val = getVariable(st, name);
748 p = replaceVariable(&sql, p, eaten, val);
755 getQueryParams(CState *st, const Command *command, const char **params)
759 for (i = 0; i < command->argc - 1; i++)
760 params[i] = getVariable(st, command->argv[i + 1]);
764 * Run a shell command. The result is assigned to the variable if not NULL.
765 * Return true if succeeded, or false on error.
768 runShellCommand(CState *st, char *variable, char **argv, int argc)
770 char command[SHELL_COMMAND_SIZE];
779 * Join arguments with whitespace separators. Arguments starting with
780 * exactly one colon are treated as variables:
781 * name - append a string "name"
782 * :var - append a variable named 'var'
783 * ::name - append a string ":name"
786 for (i = 0; i < argc; i++)
791 if (argv[i][0] != ':')
793 arg = argv[i]; /* a string literal */
795 else if (argv[i][1] == ':')
797 arg = argv[i] + 1; /* a string literal starting with colons */
799 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
801 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
805 arglen = strlen(arg);
806 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
808 fprintf(stderr, "%s: too long shell command\n", argv[0]);
813 command[len++] = ' ';
814 memcpy(command + len, arg, arglen);
820 /* Fast path for non-assignment case */
821 if (variable == NULL)
826 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
832 /* Execute the command with pipe and read the standard output. */
833 if ((fp = popen(command, "r")) == NULL)
835 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
838 if (fgets(res, sizeof(res), fp) == NULL)
841 fprintf(stderr, "%s: cannot read the result\n", argv[0]);
846 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
850 /* Check whether the result is an integer and assign it to the variable */
851 retval = (int) strtol(res, &endptr, 10);
852 while (*endptr != '\0' && isspace((unsigned char) *endptr))
854 if (*res == '\0' || *endptr != '\0')
856 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
859 snprintf(res, sizeof(res), "%d", retval);
860 if (!putVariable(st, "setshell", variable, res))
864 printf("shell parameter name: %s, value: %s\n", argv[1], res);
869 #define MAX_PREPARE_NAME 32
871 preparedStatementName(char *buffer, int file, int state)
873 sprintf(buffer, "P%d_%d", file, state);
877 clientDone(CState *st, bool ok)
879 (void) ok; /* unused */
886 return false; /* always false */
891 agg_vals_init(AggVals *aggs, instr_time start)
894 aggs->cnt = 0; /* number of transactions */
895 aggs->sum = 0; /* SUM(duration) */
896 aggs->sum2 = 0; /* SUM(duration*duration) */
898 /* min and max transaction duration */
899 aggs->min_duration = 0;
900 aggs->max_duration = 0;
902 /* start of the current interval */
903 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
906 /* return false iff client should be disconnected */
908 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
912 bool trans_needs_throttle = false;
915 commands = sql_files[st->use_file];
918 * Handle throttling once per transaction by sleeping. It is simpler to
919 * do this here rather than at the end, because so much complicated logic
920 * happens below when statements finish.
922 if (throttle_delay && !st->is_throttled)
925 * Use inverse transform sampling to randomly generate a delay, such
926 * that the series of delays will approximate a Poisson distribution
927 * centered on the throttle_delay time.
929 * 10000 implies a 9.2 (-log(1/10000)) to 0.0 (log 1) delay
930 * multiplier, and results in a 0.055 % target underestimation bias:
932 * SELECT 1.0/AVG(-LN(i/10000.0)) FROM generate_series(1,10000) AS i;
933 * = 1.000552717032611116335474
935 * If transactions are too slow or a given wait is shorter than a
936 * transaction, the next transaction will start right away.
938 int64 wait = (int64) (throttle_delay *
939 1.00055271703 * -log(getrand(thread, 1, 10000) / 10000.0));
941 thread->throttle_trigger += wait;
943 st->until = thread->throttle_trigger;
945 st->throttling = true;
946 st->is_throttled = true;
948 fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
953 { /* are we sleeping? */
957 INSTR_TIME_SET_CURRENT(now);
958 now_us = INSTR_TIME_GET_MICROSEC(now);
959 if (st->until <= now_us)
961 st->sleeping = 0; /* Done sleeping, go ahead with next command */
964 /* Measure lag of throttled transaction relative to target */
965 int64 lag = now_us - st->until;
967 thread->throttle_lag += lag;
968 if (lag > thread->throttle_lag_max)
969 thread->throttle_lag_max = lag;
970 st->throttling = false;
974 return true; /* Still sleeping, nothing to do here */
978 { /* are we receiver? */
979 if (commands[st->state]->type == SQL_COMMAND)
982 fprintf(stderr, "client %d receiving\n", st->id);
983 if (!PQconsumeInput(st->con))
984 { /* there's something wrong */
985 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
986 return clientDone(st, false);
988 if (PQisBusy(st->con))
989 return true; /* don't have the whole result yet */
993 * command finished: accumulate per-command execution times in
994 * thread-local data structure, if per-command latencies are requested
999 int cnum = commands[st->state]->command_num;
1001 INSTR_TIME_SET_CURRENT(now);
1002 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
1003 now, st->stmt_begin);
1004 thread->exec_count[cnum]++;
1007 /* transaction finished: record latency under progress or throttling */
1008 if ((progress || throttle_delay) && commands[st->state + 1] == NULL)
1013 INSTR_TIME_SET_CURRENT(diff);
1014 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
1015 latency = INSTR_TIME_GET_MICROSEC(diff);
1016 st->txn_latencies += latency;
1019 * XXX In a long benchmark run of high-latency transactions, this
1020 * int64 addition eventually overflows. For example, 100 threads
1021 * running 10s transactions will overflow it in 2.56 hours. With
1022 * a more-typical OLTP workload of .1s transactions, overflow
1023 * would take 256 hours.
1025 st->txn_sqlats += latency * latency;
1029 * if transaction finished, record the time it took in the log
1031 if (logfile && commands[st->state + 1] == NULL)
1038 * write the log entry if this row belongs to the random sample,
1039 * or no sampling rate was given which means log everything.
1041 if (sample_rate == 0.0 ||
1042 pg_erand48(thread->random_state) <= sample_rate)
1044 INSTR_TIME_SET_CURRENT(now);
1046 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
1047 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
1049 /* should we aggregate the results or not? */
1050 if (agg_interval > 0)
1053 * are we still in the same interval? if yes, accumulate
1054 * the values (print them otherwise)
1056 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
1060 agg->sum2 += usec * usec;
1062 /* first in this aggregation interval */
1063 if ((agg->cnt == 1) || (usec < agg->min_duration))
1064 agg->min_duration = usec;
1066 if ((agg->cnt == 1) || (usec > agg->max_duration))
1067 agg->max_duration = usec;
1072 * Loop until we reach the interval of the current
1073 * transaction (and print all the empty intervals in
1076 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
1079 * This is a non-Windows branch (thanks to the
1080 * ifdef in usage), so we don't need to handle
1081 * this in a special way (see below).
1083 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
1091 /* move to the next inteval */
1092 agg->start_time = agg->start_time + agg_interval;
1094 /* reset for "no transaction" intervals */
1096 agg->min_duration = 0;
1097 agg->max_duration = 0;
1103 * and now update the reset values (include the
1107 agg->min_duration = usec;
1108 agg->max_duration = usec;
1110 agg->sum2 = usec * usec;
1115 /* no, print raw transactions */
1119 * This is more than we really ought to know about
1122 fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
1123 st->id, st->cnt, usec, st->use_file,
1124 (long) now.tv_sec, (long) now.tv_usec);
1128 * On Windows, instr_time doesn't provide a timestamp
1131 fprintf(logfile, "%d %d %.0f %d 0 0\n",
1132 st->id, st->cnt, usec, st->use_file);
1138 if (commands[st->state]->type == SQL_COMMAND)
1141 * Read and discard the query result; note this is not included in
1142 * the statement latency numbers.
1144 res = PQgetResult(st->con);
1145 switch (PQresultStatus(res))
1147 case PGRES_COMMAND_OK:
1148 case PGRES_TUPLES_OK:
1151 fprintf(stderr, "Client %d aborted in state %d: %s",
1152 st->id, st->state, PQerrorMessage(st->con));
1154 return clientDone(st, false);
1157 discard_response(st);
1160 if (commands[st->state + 1] == NULL)
1169 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1170 return clientDone(st, true); /* exit success */
1173 /* increment state counter */
1175 if (commands[st->state] == NULL)
1178 st->use_file = (int) getrand(thread, 0, num_files - 1);
1179 commands = sql_files[st->use_file];
1180 st->is_throttled = false;
1183 * No transaction is underway anymore, which means there is
1184 * nothing to listen to right now. When throttling rate limits
1185 * are active, a sleep will happen next, as the next transaction
1186 * starts. And then in any case the next SQL command will set
1190 trans_needs_throttle = (throttle_delay > 0);
1194 if (st->con == NULL)
1199 INSTR_TIME_SET_CURRENT(start);
1200 if ((st->con = doConnect()) == NULL)
1202 fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1203 return clientDone(st, false);
1205 INSTR_TIME_SET_CURRENT(end);
1206 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1210 * This ensures that a throttling delay is inserted before proceeding with
1211 * sql commands, after the first transaction. The first transaction
1212 * throttling is performed when first entering doCustom.
1214 if (trans_needs_throttle)
1216 trans_needs_throttle = false;
1220 /* Record transaction start time under logging, progress or throttling */
1221 if ((logfile || progress || throttle_delay) && st->state == 0)
1222 INSTR_TIME_SET_CURRENT(st->txn_begin);
1224 /* Record statement start time if per-command latencies are requested */
1226 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1228 if (commands[st->state]->type == SQL_COMMAND)
1230 const Command *command = commands[st->state];
1233 if (querymode == QUERY_SIMPLE)
1237 sql = pg_strdup(command->argv[0]);
1238 sql = assignVariables(st, sql);
1241 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1242 r = PQsendQuery(st->con, sql);
1245 else if (querymode == QUERY_EXTENDED)
1247 const char *sql = command->argv[0];
1248 const char *params[MAX_ARGS];
1250 getQueryParams(st, command, params);
1253 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1254 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1255 NULL, params, NULL, NULL, 0);
1257 else if (querymode == QUERY_PREPARED)
1259 char name[MAX_PREPARE_NAME];
1260 const char *params[MAX_ARGS];
1262 if (!st->prepared[st->use_file])
1266 for (j = 0; commands[j] != NULL; j++)
1269 char name[MAX_PREPARE_NAME];
1271 if (commands[j]->type != SQL_COMMAND)
1273 preparedStatementName(name, st->use_file, j);
1274 res = PQprepare(st->con, name,
1275 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1276 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1277 fprintf(stderr, "%s", PQerrorMessage(st->con));
1280 st->prepared[st->use_file] = true;
1283 getQueryParams(st, command, params);
1284 preparedStatementName(name, st->use_file, st->state);
1287 fprintf(stderr, "client %d sending %s\n", st->id, name);
1288 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1289 params, NULL, NULL, 0);
1291 else /* unknown sql mode */
1297 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1301 st->listen = 1; /* flags that should be listened */
1303 else if (commands[st->state]->type == META_COMMAND)
1305 int argc = commands[st->state]->argc,
1307 char **argv = commands[st->state]->argv;
1311 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1312 for (i = 1; i < argc; i++)
1313 fprintf(stderr, " %s", argv[i]);
1314 fprintf(stderr, "\n");
1317 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1324 if (*argv[2] == ':')
1326 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1328 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1332 min = strtoint64(var);
1335 min = strtoint64(argv[2]);
1340 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1346 if (*argv[3] == ':')
1348 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1350 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1354 max = strtoint64(var);
1357 max = strtoint64(argv[3]);
1361 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1367 * getrand() needs to be able to subtract max from min and add one
1368 * to the result without overflowing. Since we know max > min, we
1369 * can detect overflow just by checking for a negative result. But
1370 * we must check both that the subtraction doesn't overflow, and
1371 * that adding one to the result doesn't overflow either.
1373 if (max - min < 0 || (max - min) + 1 < 0)
1375 fprintf(stderr, "%s: range too large\n", argv[0]);
1381 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1383 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1385 if (!putVariable(st, argv[0], argv[1], res))
1393 else if (pg_strcasecmp(argv[0], "set") == 0)
1400 if (*argv[2] == ':')
1402 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1404 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1408 ope1 = strtoint64(var);
1411 ope1 = strtoint64(argv[2]);
1414 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
1417 if (*argv[4] == ':')
1419 if ((var = getVariable(st, argv[4] + 1)) == NULL)
1421 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
1425 ope2 = strtoint64(var);
1428 ope2 = strtoint64(argv[4]);
1430 if (strcmp(argv[3], "+") == 0)
1431 snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
1432 else if (strcmp(argv[3], "-") == 0)
1433 snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
1434 else if (strcmp(argv[3], "*") == 0)
1435 snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
1436 else if (strcmp(argv[3], "/") == 0)
1440 fprintf(stderr, "%s: division by zero\n", argv[0]);
1444 snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
1448 fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
1454 if (!putVariable(st, argv[0], argv[1], res))
1462 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1468 if (*argv[1] == ':')
1470 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1472 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1479 usec = atoi(argv[1]);
1483 if (pg_strcasecmp(argv[2], "ms") == 0)
1485 else if (pg_strcasecmp(argv[2], "s") == 0)
1491 INSTR_TIME_SET_CURRENT(now);
1492 st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
1497 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1499 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1501 if (timer_exceeded) /* timeout */
1502 return clientDone(st, true);
1503 else if (!ret) /* on error */
1508 else /* succeeded */
1511 else if (pg_strcasecmp(argv[0], "shell") == 0)
1513 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1515 if (timer_exceeded) /* timeout */
1516 return clientDone(st, true);
1517 else if (!ret) /* on error */
1522 else /* succeeded */
1531 /* discard connections */
1533 disconnect_all(CState *state, int length)
1537 for (i = 0; i < length; i++)
1541 PQfinish(state[i].con);
1542 state[i].con = NULL;
1547 /* create tables and setup data */
1549 init(bool is_no_vacuum)
1552 * The scale factor at/beyond which 32-bit integers are insufficient for
1553 * storing TPC-B account IDs.
1555 * Although the actual threshold is 21474, we use 20000 because it is easier to
1556 * document and remember, and isn't that far away from the real threshold.
1558 #define SCALE_32BIT_THRESHOLD 20000
1561 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1562 * fields in these table declarations were intended to comply with that.
1563 * The pgbench_accounts table complies with that because the "filler"
1564 * column is set to blank-padded empty string. But for all other tables
1565 * the columns default to NULL and so don't actually take any space. We
1566 * could fix that by giving them non-null default values. However, that
1567 * would completely break comparability of pgbench results with prior
1568 * versions. Since pgbench has never pretended to be fully TPC-B compliant
1569 * anyway, we stick with the historical behavior.
1573 const char *table; /* table name */
1574 const char *smcols; /* column decls if accountIDs are 32 bits */
1575 const char *bigcols; /* column decls if accountIDs are 64 bits */
1576 int declare_fillfactor;
1578 static const struct ddlinfo DDLs[] = {
1581 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1582 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
1587 "tid int not null,bid int,tbalance int,filler char(84)",
1588 "tid int not null,bid int,tbalance int,filler char(84)",
1593 "aid int not null,bid int,abalance int,filler char(84)",
1594 "aid bigint not null,bid int,abalance int,filler char(84)",
1599 "bid int not null,bbalance int,filler char(88)",
1600 "bid int not null,bbalance int,filler char(88)",
1604 static const char *const DDLINDEXes[] = {
1605 "alter table pgbench_branches add primary key (bid)",
1606 "alter table pgbench_tellers add primary key (tid)",
1607 "alter table pgbench_accounts add primary key (aid)"
1609 static const char *const DDLKEYs[] = {
1610 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1611 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1612 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1613 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1614 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1623 /* used to track elapsed time and estimate of the remaining time */
1628 int log_interval = 1;
1630 if ((con = doConnect()) == NULL)
1633 for (i = 0; i < lengthof(DDLs); i++)
1637 const struct ddlinfo *ddl = &DDLs[i];
1640 /* Remove old table, if it exists. */
1641 snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
1642 executeStatement(con, buffer);
1644 /* Construct new create table statement. */
1646 if (ddl->declare_fillfactor)
1647 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1648 " with (fillfactor=%d)", fillfactor);
1649 if (tablespace != NULL)
1651 char *escape_tablespace;
1653 escape_tablespace = PQescapeIdentifier(con, tablespace,
1654 strlen(tablespace));
1655 snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
1656 " tablespace %s", escape_tablespace);
1657 PQfreemem(escape_tablespace);
1660 cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
1662 snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
1663 unlogged_tables ? " unlogged" : "",
1664 ddl->table, cols, opts);
1666 executeStatement(con, buffer);
1669 executeStatement(con, "begin");
1671 for (i = 0; i < nbranches * scale; i++)
1673 /* "filler" column defaults to NULL */
1674 snprintf(sql, sizeof(sql),
1675 "insert into pgbench_branches(bid,bbalance) values(%d,0)",
1677 executeStatement(con, sql);
1680 for (i = 0; i < ntellers * scale; i++)
1682 /* "filler" column defaults to NULL */
1683 snprintf(sql, sizeof(sql),
1684 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
1685 i + 1, i / ntellers + 1);
1686 executeStatement(con, sql);
1689 executeStatement(con, "commit");
1692 * fill the pgbench_accounts table with some data
1694 fprintf(stderr, "creating tables...\n");
1696 executeStatement(con, "begin");
1697 executeStatement(con, "truncate pgbench_accounts");
1699 res = PQexec(con, "copy pgbench_accounts from stdin");
1700 if (PQresultStatus(res) != PGRES_COPY_IN)
1702 fprintf(stderr, "%s", PQerrorMessage(con));
1707 INSTR_TIME_SET_CURRENT(start);
1709 for (k = 0; k < (int64) naccounts * scale; k++)
1713 /* "filler" column defaults to blank padded empty string */
1714 snprintf(sql, sizeof(sql),
1715 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
1716 j, k / naccounts + 1, 0);
1717 if (PQputline(con, sql))
1719 fprintf(stderr, "PQputline failed\n");
1724 * If we want to stick with the original logging, print a message each
1725 * 100k inserted rows.
1727 if ((!use_quiet) && (j % 100000 == 0))
1729 INSTR_TIME_SET_CURRENT(diff);
1730 INSTR_TIME_SUBTRACT(diff, start);
1732 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1733 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
1735 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1736 j, (int64) naccounts * scale,
1737 (int) (((int64) j * 100) / (naccounts * (int64) scale)),
1738 elapsed_sec, remaining_sec);
1740 /* let's not call the timing for each row, but only each 100 rows */
1741 else if (use_quiet && (j % 100 == 0))
1743 INSTR_TIME_SET_CURRENT(diff);
1744 INSTR_TIME_SUBTRACT(diff, start);
1746 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1747 remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
1749 /* have we reached the next interval (or end)? */
1750 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
1752 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1753 j, (int64) naccounts * scale,
1754 (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
1756 /* skip to the next interval */
1757 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
1762 if (PQputline(con, "\\.\n"))
1764 fprintf(stderr, "very last PQputline failed\n");
1769 fprintf(stderr, "PQendcopy failed\n");
1772 executeStatement(con, "commit");
1777 fprintf(stderr, "vacuum...\n");
1778 executeStatement(con, "vacuum analyze pgbench_branches");
1779 executeStatement(con, "vacuum analyze pgbench_tellers");
1780 executeStatement(con, "vacuum analyze pgbench_accounts");
1781 executeStatement(con, "vacuum analyze pgbench_history");
1787 fprintf(stderr, "set primary keys...\n");
1788 for (i = 0; i < lengthof(DDLINDEXes); i++)
1792 strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
1794 if (index_tablespace != NULL)
1796 char *escape_tablespace;
1798 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
1799 strlen(index_tablespace));
1800 snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
1801 " using index tablespace %s", escape_tablespace);
1802 PQfreemem(escape_tablespace);
1805 executeStatement(con, buffer);
1809 * create foreign keys
1813 fprintf(stderr, "set foreign keys...\n");
1814 for (i = 0; i < lengthof(DDLKEYs); i++)
1816 executeStatement(con, DDLKEYs[i]);
1820 fprintf(stderr, "done.\n");
1825 * Parse the raw sql and replace :param to $n.
1828 parseQuery(Command *cmd, const char *raw_sql)
1833 sql = pg_strdup(raw_sql);
1837 while ((p = strchr(p, ':')) != NULL)
1843 name = parseVariable(p, &eaten);
1853 if (cmd->argc >= MAX_ARGS)
1855 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
1859 sprintf(var, "$%d", cmd->argc);
1860 p = replaceVariable(&sql, p, eaten, var);
1862 cmd->argv[cmd->argc] = name;
1870 /* Parse a command; return a Command struct, or NULL if it's a comment */
1872 process_commands(char *buf)
1874 const char delim[] = " \f\n\r\t\v";
1876 Command *my_commands;
1881 /* Make the string buf end at the next newline */
1882 if ((p = strchr(buf, '\n')) != NULL)
1885 /* Skip leading whitespace */
1887 while (isspace((unsigned char) *p))
1890 /* If the line is empty or actually a comment, we're done */
1891 if (*p == '\0' || strncmp(p, "--", 2) == 0)
1894 /* Allocate and initialize Command structure */
1895 my_commands = (Command *) pg_malloc(sizeof(Command));
1896 my_commands->line = pg_strdup(buf);
1897 my_commands->command_num = num_commands++;
1898 my_commands->type = 0; /* until set */
1899 my_commands->argc = 0;
1903 my_commands->type = META_COMMAND;
1906 tok = strtok(++p, delim);
1910 my_commands->argv[j++] = pg_strdup(tok);
1911 my_commands->argc++;
1912 tok = strtok(NULL, delim);
1915 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
1917 if (my_commands->argc < 4)
1919 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1923 for (j = 4; j < my_commands->argc; j++)
1924 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1925 my_commands->argv[0], my_commands->argv[j]);
1927 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
1929 if (my_commands->argc < 3)
1931 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1935 for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
1936 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1937 my_commands->argv[0], my_commands->argv[j]);
1939 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
1941 if (my_commands->argc < 2)
1943 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1948 * Split argument into number and unit to allow "sleep 1ms" etc.
1949 * We don't have to terminate the number argument with null
1950 * because it will be parsed with atoi, which ignores trailing
1951 * non-digit characters.
1953 if (my_commands->argv[1][0] != ':')
1955 char *c = my_commands->argv[1];
1957 while (isdigit((unsigned char) *c))
1961 my_commands->argv[2] = c;
1962 if (my_commands->argc < 3)
1963 my_commands->argc = 3;
1967 if (my_commands->argc >= 3)
1969 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
1970 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
1971 pg_strcasecmp(my_commands->argv[2], "s") != 0)
1973 fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
1974 my_commands->argv[0], my_commands->argv[2]);
1979 for (j = 3; j < my_commands->argc; j++)
1980 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1981 my_commands->argv[0], my_commands->argv[j]);
1983 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
1985 if (my_commands->argc < 3)
1987 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1991 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
1993 if (my_commands->argc < 1)
1995 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
2001 fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
2007 my_commands->type = SQL_COMMAND;
2012 my_commands->argv[0] = pg_strdup(p);
2013 my_commands->argc++;
2015 case QUERY_EXTENDED:
2016 case QUERY_PREPARED:
2017 if (!parseQuery(my_commands, p))
2029 * Read a line from fd, and return it in a malloc'd buffer.
2030 * Return NULL at EOF.
2032 * The buffer will typically be larger than necessary, but we don't care
2033 * in this program, because we'll free it as soon as we've parsed the line.
2036 read_line_from_file(FILE *fd)
2038 char tmpbuf[BUFSIZ];
2040 size_t buflen = BUFSIZ;
2043 buf = (char *) palloc(buflen);
2046 while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
2048 size_t thislen = strlen(tmpbuf);
2050 /* Append tmpbuf to whatever we had already */
2051 memcpy(buf + used, tmpbuf, thislen + 1);
2054 /* Done if we collected a newline */
2055 if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
2058 /* Else, enlarge buf to ensure we can append next bufferload */
2060 buf = (char *) pg_realloc(buf, buflen);
2072 process_file(char *filename)
2074 #define COMMANDS_ALLOC_NUM 128
2076 Command **my_commands;
2082 if (num_files >= MAX_FILES)
2084 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
2088 alloc_num = COMMANDS_ALLOC_NUM;
2089 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2091 if (strcmp(filename, "-") == 0)
2093 else if ((fd = fopen(filename, "r")) == NULL)
2095 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
2101 while ((buf = read_line_from_file(fd)) != NULL)
2105 command = process_commands(buf);
2109 if (command == NULL)
2112 my_commands[lineno] = command;
2115 if (lineno >= alloc_num)
2117 alloc_num += COMMANDS_ALLOC_NUM;
2118 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2123 my_commands[lineno] = NULL;
2125 sql_files[num_files++] = my_commands;
2131 process_builtin(char *tb)
2133 #define COMMANDS_ALLOC_NUM 128
2135 Command **my_commands;
2140 alloc_num = COMMANDS_ALLOC_NUM;
2141 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
2151 while (*tb && *tb != '\n')
2162 command = process_commands(buf);
2163 if (command == NULL)
2166 my_commands[lineno] = command;
2169 if (lineno >= alloc_num)
2171 alloc_num += COMMANDS_ALLOC_NUM;
2172 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2176 my_commands[lineno] = NULL;
2181 /* print out results */
2183 printResults(int ttype, int64 normal_xacts, int nclients,
2184 TState *threads, int nthreads,
2185 instr_time total_time, instr_time conn_total_time,
2186 int64 total_latencies, int64 total_sqlats,
2187 int64 throttle_lag, int64 throttle_lag_max)
2189 double time_include,
2194 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2195 tps_include = normal_xacts / time_include;
2196 tps_exclude = normal_xacts / (time_include -
2197 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2200 s = "TPC-B (sort of)";
2201 else if (ttype == 2)
2202 s = "Update only pgbench_accounts";
2203 else if (ttype == 1)
2208 printf("transaction type: %s\n", s);
2209 printf("scaling factor: %d\n", scale);
2210 printf("query mode: %s\n", QUERYMODE[querymode]);
2211 printf("number of clients: %d\n", nclients);
2212 printf("number of threads: %d\n", nthreads);
2215 printf("number of transactions per client: %d\n", nxacts);
2216 printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
2217 normal_xacts, (int64) nxacts * nclients);
2221 printf("duration: %d s\n", duration);
2222 printf("number of transactions actually processed: " INT64_FORMAT "\n",
2226 if (throttle_delay || progress)
2228 /* compute and show latency average and standard deviation */
2229 double latency = 0.001 * total_latencies / normal_xacts;
2230 double sqlat = (double) total_sqlats / normal_xacts;
2232 printf("latency average: %.3f ms\n"
2233 "latency stddev: %.3f ms\n",
2234 latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
2238 /* only an average latency computed from the duration is available */
2239 printf("latency average: %.3f ms\n",
2240 1000.0 * duration * nclients / normal_xacts);
2246 * Report average transaction lag under rate limit throttling. This
2247 * is the delay between scheduled and actual start times for the
2248 * transaction. The measured lag may be caused by thread/client load,
2249 * the database load, or the Poisson throttling process.
2251 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
2252 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
2255 printf("tps = %f (including connections establishing)\n", tps_include);
2256 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2258 /* Report per-command latencies */
2263 for (i = 0; i < num_files; i++)
2268 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2270 printf("statement latencies in milliseconds:\n");
2272 for (commands = sql_files[i]; *commands != NULL; commands++)
2274 Command *command = *commands;
2275 int cnum = command->command_num;
2277 instr_time total_exec_elapsed;
2278 int total_exec_count;
2281 /* Accumulate per-thread data for command */
2282 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2283 total_exec_count = 0;
2284 for (t = 0; t < nthreads; t++)
2286 TState *thread = &threads[t];
2288 INSTR_TIME_ADD(total_exec_elapsed,
2289 thread->exec_elapsed[cnum]);
2290 total_exec_count += thread->exec_count[cnum];
2293 if (total_exec_count > 0)
2294 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2298 printf("\t%f\t%s\n", total_time, command->line);
2306 main(int argc, char **argv)
2308 static struct option long_options[] = {
2309 /* systematic long/short named options */
2310 {"client", required_argument, NULL, 'c'},
2311 {"connect", no_argument, NULL, 'C'},
2312 {"debug", no_argument, NULL, 'd'},
2313 {"define", required_argument, NULL, 'D'},
2314 {"file", required_argument, NULL, 'f'},
2315 {"fillfactor", required_argument, NULL, 'F'},
2316 {"host", required_argument, NULL, 'h'},
2317 {"initialize", no_argument, NULL, 'i'},
2318 {"jobs", required_argument, NULL, 'j'},
2319 {"log", no_argument, NULL, 'l'},
2320 {"no-vacuum", no_argument, NULL, 'n'},
2321 {"port", required_argument, NULL, 'p'},
2322 {"progress", required_argument, NULL, 'P'},
2323 {"protocol", required_argument, NULL, 'M'},
2324 {"quiet", no_argument, NULL, 'q'},
2325 {"report-latencies", no_argument, NULL, 'r'},
2326 {"scale", required_argument, NULL, 's'},
2327 {"select-only", no_argument, NULL, 'S'},
2328 {"skip-some-updates", no_argument, NULL, 'N'},
2329 {"time", required_argument, NULL, 'T'},
2330 {"transactions", required_argument, NULL, 't'},
2331 {"username", required_argument, NULL, 'U'},
2332 {"vacuum-all", no_argument, NULL, 'v'},
2333 /* long-named only options */
2334 {"foreign-keys", no_argument, &foreign_keys, 1},
2335 {"index-tablespace", required_argument, NULL, 3},
2336 {"tablespace", required_argument, NULL, 2},
2337 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2338 {"sampling-rate", required_argument, NULL, 4},
2339 {"aggregate-interval", required_argument, NULL, 5},
2340 {"rate", required_argument, NULL, 'R'},
2345 int nclients = 1; /* default number of simulated clients */
2346 int nthreads = 1; /* default number of threads */
2347 int is_init_mode = 0; /* initialize mode? */
2348 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2349 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2350 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2351 * 2: skip update of branches and tellers */
2353 char *filename = NULL;
2354 bool scale_given = false;
2356 CState *state; /* status of clients */
2357 TState *threads; /* array of thread */
2359 instr_time start_time; /* start up time */
2360 instr_time total_time;
2361 instr_time conn_total_time;
2362 int64 total_xacts = 0;
2363 int64 total_latencies = 0;
2364 int64 total_sqlats = 0;
2365 int64 throttle_lag = 0;
2366 int64 throttle_lag_max = 0;
2370 #ifdef HAVE_GETRLIMIT
2380 progname = get_progname(argv[0]);
2384 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2389 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2391 puts("pgbench (PostgreSQL) " PG_VERSION);
2397 /* stderr is buffered on Win32. */
2398 setvbuf(stderr, NULL, _IONBF, 0);
2401 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2403 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2405 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2408 state = (CState *) pg_malloc(sizeof(CState));
2409 memset(state, 0, sizeof(CState));
2411 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
2419 pghost = pg_strdup(optarg);
2425 do_vacuum_accounts++;
2428 pgport = pg_strdup(optarg);
2440 nclients = atoi(optarg);
2441 if (nclients <= 0 || nclients > MAXCLIENTS)
2443 fprintf(stderr, "invalid number of clients: %d\n", nclients);
2446 #ifdef HAVE_GETRLIMIT
2447 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2448 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2449 #else /* but BSD doesn't ... */
2450 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2451 #endif /* RLIMIT_NOFILE */
2453 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2456 if (rlim.rlim_cur <= (nclients + 2))
2458 fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2459 fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2462 #endif /* HAVE_GETRLIMIT */
2464 case 'j': /* jobs */
2465 nthreads = atoi(optarg);
2468 fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2476 is_latencies = true;
2480 scale = atoi(optarg);
2483 fprintf(stderr, "invalid scaling factor: %d\n", scale);
2490 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2493 nxacts = atoi(optarg);
2496 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2503 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2506 duration = atoi(optarg);
2509 fprintf(stderr, "invalid duration: %d\n", duration);
2514 login = pg_strdup(optarg);
2524 filename = pg_strdup(optarg);
2525 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2532 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2534 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2539 if (!putVariable(&state[0], "option", optarg, p))
2544 fillfactor = atoi(optarg);
2545 if ((fillfactor < 10) || (fillfactor > 100))
2547 fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2554 fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
2557 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2558 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2560 if (querymode >= NUM_QUERYMODE)
2562 fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
2567 progress = atoi(optarg);
2571 "thread progress delay (-P) must be positive (%s)\n",
2578 /* get a double from the beginning of option value */
2579 double throttle_value = atof(optarg);
2581 if (throttle_value <= 0.0)
2583 fprintf(stderr, "invalid rate limit: %s\n", optarg);
2586 /* Invert rate limit into a time offset */
2587 throttle_delay = (int64) (1000000.0 / throttle_value);
2591 /* This covers long options which take no argument. */
2593 case 2: /* tablespace */
2594 tablespace = pg_strdup(optarg);
2596 case 3: /* index-tablespace */
2597 index_tablespace = pg_strdup(optarg);
2600 sample_rate = atof(optarg);
2601 if (sample_rate <= 0.0 || sample_rate > 1.0)
2603 fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
2609 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
2612 agg_interval = atoi(optarg);
2613 if (agg_interval <= 0)
2615 fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
2621 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
2627 /* compute a per thread delay */
2628 throttle_delay *= nthreads;
2631 dbName = argv[optind];
2634 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
2636 else if (login != NULL && *login != '\0')
2648 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
2649 if (nxacts <= 0 && duration <= 0)
2650 nxacts = DEFAULT_NXACTS;
2652 if (nclients % nthreads != 0)
2654 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
2658 /* --sampling-rate may be used only with -l */
2659 if (sample_rate > 0.0 && !use_log)
2661 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
2665 /* -q may be used only with -i */
2666 if (use_quiet && !is_init_mode)
2668 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
2672 /* --sampling-rate may must not be used with --aggregate-interval */
2673 if (sample_rate > 0.0 && agg_interval > 0)
2675 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
2679 if (agg_interval > 0 && (!use_log))
2681 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
2685 if ((duration > 0) && (agg_interval > duration))
2687 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
2691 if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
2693 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
2698 * is_latencies only works with multiple threads in thread-based
2699 * implementations, not fork-based ones, because it supposes that the
2700 * parent can see changes made to the per-thread execution stats by child
2701 * threads. It seems useful enough to accept despite this limitation, but
2702 * perhaps we should FIXME someday (by passing the stats data back up
2703 * through the parent-to-child pipes).
2705 #ifndef ENABLE_THREAD_SAFETY
2706 if (is_latencies && nthreads > 1)
2708 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
2714 * save main process id in the global variable because process id will be
2715 * changed after fork.
2717 main_pid = (int) getpid();
2718 progress_nclients = nclients;
2719 progress_nthreads = nthreads;
2723 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
2724 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
2726 /* copy any -D switch values to all clients */
2727 for (i = 1; i < nclients; i++)
2732 for (j = 0; j < state[0].nvariables; j++)
2734 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
2743 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
2744 pghost, pgport, nclients, nxacts, dbName);
2746 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
2747 pghost, pgport, nclients, duration, dbName);
2750 /* opening connection... */
2755 if (PQstatus(con) == CONNECTION_BAD)
2757 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
2758 fprintf(stderr, "%s", PQerrorMessage(con));
2765 * get the scaling factor that should be same as count(*) from
2766 * pgbench_branches if this is not a custom query
2768 res = PQexec(con, "select count(*) from pgbench_branches");
2769 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2771 fprintf(stderr, "%s", PQerrorMessage(con));
2774 scale = atoi(PQgetvalue(res, 0, 0));
2777 fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
2782 /* warn if we override user-given -s switch */
2785 "Scale option ignored, using pgbench_branches table count = %d\n",
2790 * :scale variables normally get -s or database scale, but don't override
2791 * an explicit -D switch
2793 if (getVariable(&state[0], "scale") == NULL)
2795 snprintf(val, sizeof(val), "%d", scale);
2796 for (i = 0; i < nclients; i++)
2798 if (!putVariable(&state[i], "startup", "scale", val))
2804 * Define a :client_id variable that is unique per connection. But don't
2805 * override an explicit -D switch.
2807 if (getVariable(&state[0], "client_id") == NULL)
2809 for (i = 0; i < nclients; i++)
2811 snprintf(val, sizeof(val), "%d", i);
2812 if (!putVariable(&state[i], "startup", "client_id", val))
2819 fprintf(stderr, "starting vacuum...");
2820 executeStatement(con, "vacuum pgbench_branches");
2821 executeStatement(con, "vacuum pgbench_tellers");
2822 executeStatement(con, "truncate pgbench_history");
2823 fprintf(stderr, "end.\n");
2825 if (do_vacuum_accounts)
2827 fprintf(stderr, "starting vacuum pgbench_accounts...");
2828 executeStatement(con, "vacuum analyze pgbench_accounts");
2829 fprintf(stderr, "end.\n");
2834 /* set random seed */
2835 INSTR_TIME_SET_CURRENT(start_time);
2836 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
2838 /* process builtin SQL scripts */
2842 sql_files[0] = process_builtin(tpc_b);
2847 sql_files[0] = process_builtin(select_only);
2852 sql_files[0] = process_builtin(simple_update);
2860 /* set up thread data structures */
2861 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
2862 for (i = 0; i < nthreads; i++)
2864 TState *thread = &threads[i];
2867 thread->state = &state[nclients / nthreads * i];
2868 thread->nstate = nclients / nthreads;
2869 thread->random_state[0] = random();
2870 thread->random_state[1] = random();
2871 thread->random_state[2] = random();
2875 /* Reserve memory for the thread to store per-command latencies */
2878 thread->exec_elapsed = (instr_time *)
2879 pg_malloc(sizeof(instr_time) * num_commands);
2880 thread->exec_count = (int *)
2881 pg_malloc(sizeof(int) * num_commands);
2883 for (t = 0; t < num_commands; t++)
2885 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
2886 thread->exec_count[t] = 0;
2891 thread->exec_elapsed = NULL;
2892 thread->exec_count = NULL;
2896 /* get start up time */
2897 INSTR_TIME_SET_CURRENT(start_time);
2899 /* set alarm if duration is specified. */
2904 for (i = 0; i < nthreads; i++)
2906 TState *thread = &threads[i];
2908 INSTR_TIME_SET_CURRENT(thread->start_time);
2910 /* the first thread (i = 0) is executed by main thread */
2913 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
2915 if (err != 0 || thread->thread == INVALID_THREAD)
2917 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
2923 thread->thread = INVALID_THREAD;
2927 /* wait for threads and accumulate results */
2928 INSTR_TIME_SET_ZERO(conn_total_time);
2929 for (i = 0; i < nthreads; i++)
2933 if (threads[i].thread == INVALID_THREAD)
2934 ret = threadRun(&threads[i]);
2936 pthread_join(threads[i].thread, &ret);
2940 TResult *r = (TResult *) ret;
2942 total_xacts += r->xacts;
2943 total_latencies += r->latencies;
2944 total_sqlats += r->sqlats;
2945 throttle_lag += r->throttle_lag;
2946 if (r->throttle_lag_max > throttle_lag_max)
2947 throttle_lag_max = r->throttle_lag_max;
2948 INSTR_TIME_ADD(conn_total_time, r->conn_time);
2952 disconnect_all(state, nclients);
2955 * XXX We compute results as though every client of every thread started
2956 * and finished at the same time. That model can diverge noticeably from
2957 * reality for a short benchmark run involving relatively many threads.
2958 * The first thread may process notably many transactions before the last
2959 * thread begins. Improving the model alone would bring limited benefit,
2960 * because performance during those periods of partial thread count can
2961 * easily exceed steady state performance. This is one of the many ways
2962 * short runs convey deceptive performance figures.
2964 INSTR_TIME_SET_CURRENT(total_time);
2965 INSTR_TIME_SUBTRACT(total_time, start_time);
2966 printResults(ttype, total_xacts, nclients, threads, nthreads,
2967 total_time, conn_total_time, total_latencies, total_sqlats,
2968 throttle_lag, throttle_lag_max);
2974 threadRun(void *arg)
2976 TState *thread = (TState *) arg;
2977 CState *state = thread->state;
2979 FILE *logfile = NULL; /* per-thread log file */
2982 int nstate = thread->nstate;
2983 int remains = nstate; /* number of remaining clients */
2986 /* for reporting progress: */
2987 int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
2988 int64 last_report = thread_start;
2989 int64 next_report = last_report + (int64) progress * 1000000;
2990 int64 last_count = 0,
2998 * Initialize throttling rate target for all of the thread's clients. It
2999 * might be a little more accurate to reset thread->start_time here too.
3000 * The possible drift seems too small relative to typical throttle delay
3001 * times to worry about it.
3003 INSTR_TIME_SET_CURRENT(start);
3004 thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
3005 thread->throttle_lag = 0;
3006 thread->throttle_lag_max = 0;
3008 result = pg_malloc(sizeof(TResult));
3010 INSTR_TIME_SET_ZERO(result->conn_time);
3012 /* open log file if requested */
3017 if (thread->tid == 0)
3018 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
3020 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
3021 logfile = fopen(logpath, "w");
3023 if (logfile == NULL)
3025 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
3032 /* make connections to the database */
3033 for (i = 0; i < nstate; i++)
3035 if ((state[i].con = doConnect()) == NULL)
3040 /* time after thread and connections set up */
3041 INSTR_TIME_SET_CURRENT(result->conn_time);
3042 INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
3044 agg_vals_init(&aggs, thread->start_time);
3046 /* send start up queries in async manner */
3047 for (i = 0; i < nstate; i++)
3049 CState *st = &state[i];
3050 Command **commands = sql_files[st->use_file];
3051 int prev_ecnt = st->ecnt;
3053 st->use_file = getrand(thread, 0, num_files - 1);
3054 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3055 remains--; /* I've aborted */
3057 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3059 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
3060 remains--; /* I've aborted */
3069 int maxsock; /* max socket number to be waited */
3073 FD_ZERO(&input_mask);
3076 min_usec = INT64_MAX;
3077 for (i = 0; i < nstate; i++)
3079 CState *st = &state[i];
3080 Command **commands = sql_files[st->use_file];
3083 if (st->con == NULL)
3087 else if (st->sleeping)
3089 if (st->throttling && timer_exceeded)
3091 /* interrupt client which has not started a transaction */
3094 st->throttling = false;
3099 else /* just a nap from the script */
3103 if (min_usec == INT64_MAX)
3107 INSTR_TIME_SET_CURRENT(now);
3108 now_usec = INSTR_TIME_GET_MICROSEC(now);
3111 this_usec = st->until - now_usec;
3112 if (min_usec > this_usec)
3113 min_usec = this_usec;
3116 else if (commands[st->state]->type == META_COMMAND)
3118 min_usec = 0; /* the connection is ready to run */
3122 sock = PQsocket(st->con);
3125 fprintf(stderr, "bad socket: %s\n", strerror(errno));
3129 FD_SET(sock, &input_mask);
3135 if (min_usec > 0 && maxsock != -1)
3137 int nsocks; /* return from select(2) */
3139 if (min_usec != INT64_MAX)
3141 struct timeval timeout;
3143 timeout.tv_sec = min_usec / 1000000;
3144 timeout.tv_usec = min_usec % 1000000;
3145 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
3148 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
3153 /* must be something wrong */
3154 fprintf(stderr, "select failed: %s\n", strerror(errno));
3159 /* ok, backend returns reply */
3160 for (i = 0; i < nstate; i++)
3162 CState *st = &state[i];
3163 Command **commands = sql_files[st->use_file];
3164 int prev_ecnt = st->ecnt;
3166 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
3167 || commands[st->state]->type == META_COMMAND))
3169 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
3170 remains--; /* I've aborted */
3173 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
3175 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
3176 remains--; /* I've aborted */
3182 #ifdef PTHREAD_FORK_EMULATION
3183 /* each process reports its own progression */
3186 instr_time now_time;
3189 INSTR_TIME_SET_CURRENT(now_time);
3190 now = INSTR_TIME_GET_MICROSEC(now_time);
3191 if (now >= next_report)
3193 /* generate and show report */
3197 int64 lags = thread->throttle_lag;
3198 int64 run = now - last_report;
3206 for (i = 0; i < nstate; i++)
3208 count += state[i].cnt;
3209 lats += state[i].txn_latencies;
3210 sqlats += state[i].txn_sqlats;
3213 total_run = (now - thread_start) / 1000000.0;
3214 tps = 1000000.0 * (count - last_count) / run;
3215 latency = 0.001 * (lats - last_lats) / (count - last_count);
3216 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3217 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3218 lag = 0.001 * (lags - last_lags) / (count - last_count);
3222 "progress %d: %.1f s, %.1f tps, "
3223 "lat %.3f ms stddev %.3f, lag %.3f ms\n",
3224 thread->tid, total_run, tps, latency, stdev, lag);
3227 "progress %d: %.1f s, %.1f tps, "
3228 "lat %.3f ms stddev %.3f\n",
3229 thread->tid, total_run, tps, latency, stdev);
3233 last_sqlats = sqlats;
3236 next_report += (int64) progress *1000000;
3240 /* progress report by thread 0 for all threads */
3241 if (progress && thread->tid == 0)
3243 instr_time now_time;
3246 INSTR_TIME_SET_CURRENT(now_time);
3247 now = INSTR_TIME_GET_MICROSEC(now_time);
3248 if (now >= next_report)
3250 /* generate and show report */
3255 int64 run = now - last_report;
3263 for (i = 0; i < progress_nclients; i++)
3265 count += state[i].cnt;
3266 lats += state[i].txn_latencies;
3267 sqlats += state[i].txn_sqlats;
3270 for (i = 0; i < progress_nthreads; i++)
3271 lags += thread[i].throttle_lag;
3273 total_run = (now - thread_start) / 1000000.0;
3274 tps = 1000000.0 * (count - last_count) / run;
3275 latency = 0.001 * (lats - last_lats) / (count - last_count);
3276 sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
3277 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
3278 lag = 0.001 * (lags - last_lags) / (count - last_count);
3282 "progress: %.1f s, %.1f tps, "
3283 "lat %.3f ms stddev %.3f, lag %.3f ms\n",
3284 total_run, tps, latency, stdev, lag);
3287 "progress: %.1f s, %.1f tps, "
3288 "lat %.3f ms stddev %.3f\n",
3289 total_run, tps, latency, stdev);
3293 last_sqlats = sqlats;
3296 next_report += (int64) progress *1000000;
3299 #endif /* PTHREAD_FORK_EMULATION */
3303 INSTR_TIME_SET_CURRENT(start);
3304 disconnect_all(state, nstate);
3306 result->latencies = 0;
3308 for (i = 0; i < nstate; i++)
3310 result->xacts += state[i].cnt;
3311 result->latencies += state[i].txn_latencies;
3312 result->sqlats += state[i].txn_sqlats;
3314 result->throttle_lag = thread->throttle_lag;
3315 result->throttle_lag_max = thread->throttle_lag_max;
3316 INSTR_TIME_SET_CURRENT(end);
3317 INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
3324 * Support for duration option: set timer_exceeded after so many seconds.
3330 handle_sig_alarm(SIGNAL_ARGS)
3332 timer_exceeded = true;
3336 setalarm(int seconds)
3338 pqsignal(SIGALRM, handle_sig_alarm);
3342 #ifndef ENABLE_THREAD_SAFETY
3345 * implements pthread using fork.
3348 typedef struct fork_pthread
3355 pthread_create(pthread_t *thread,
3356 pthread_attr_t *attr,
3357 void *(*start_routine) (void *),
3364 th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
3365 if (pipe(th->pipes) < 0)
3372 if (th->pid == -1) /* error */
3377 if (th->pid != 0) /* in parent process */
3379 close(th->pipes[1]);
3384 /* in child process */
3385 close(th->pipes[0]);
3387 /* set alarm again because the child does not inherit timers */
3391 ret = start_routine(arg);
3392 rc = write(th->pipes[1], ret, sizeof(TResult));
3394 close(th->pipes[1]);
3400 pthread_join(pthread_t th, void **thread_return)
3404 while (waitpid(th->pid, &status, 0) != th->pid)
3410 if (thread_return != NULL)
3412 /* assume result is TResult */
3413 *thread_return = pg_malloc(sizeof(TResult));
3414 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
3416 free(*thread_return);
3417 *thread_return = NULL;
3420 close(th->pipes[0]);
3428 static VOID CALLBACK
3429 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3431 timer_exceeded = true;
3435 setalarm(int seconds)
3440 /* This function will be called at most once, so we can cheat a bit. */
3441 queue = CreateTimerQueue();
3442 if (seconds > ((DWORD) -1) / 1000 ||
3443 !CreateTimerQueueTimer(&timer, queue,
3444 win32_timer_callback, NULL, seconds * 1000, 0,
3445 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3447 fprintf(stderr, "Failed to set timer\n");
3452 /* partial pthread implementation for Windows */
3454 typedef struct win32_pthread
3457 void *(*routine) (void *);
3462 static unsigned __stdcall
3463 win32_pthread_run(void *arg)
3465 win32_pthread *th = (win32_pthread *) arg;
3467 th->result = th->routine(th->arg);
3473 pthread_create(pthread_t *thread,
3474 pthread_attr_t *attr,
3475 void *(*start_routine) (void *),
3481 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3482 th->routine = start_routine;
3486 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3487 if (th->handle == NULL)
3499 pthread_join(pthread_t th, void **thread_return)
3501 if (th == NULL || th->handle == NULL)
3502 return errno = EINVAL;
3504 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3506 _dosmaperr(GetLastError());
3511 *thread_return = th->result;
3513 CloseHandle(th->handle);