4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * contrib/pgbench/pgbench.c
8 * Copyright (c) 2000-2013, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "portability/instr_time.h"
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
53 #ifdef HAVE_SYS_RESOURCE_H
54 #include <sys/resource.h> /* for getrlimit */
58 #define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
62 * Multi-platform pthread implementations
66 /* Use native win32 threads on Windows */
67 typedef struct win32_pthread *pthread_t;
68 typedef int pthread_attr_t;
70 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
71 static int pthread_join(pthread_t th, void **thread_return);
72 #elif defined(ENABLE_THREAD_SAFETY)
73 /* Use platform-dependent pthread capability */
76 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
80 #define pthread_t pg_pthread_t
81 #define pthread_attr_t pg_pthread_attr_t
82 #define pthread_create pg_pthread_create
83 #define pthread_join pg_pthread_join
85 typedef struct fork_pthread *pthread_t;
86 typedef int pthread_attr_t;
88 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
89 static int pthread_join(pthread_t th, void **thread_return);
96 /********************************************************************
97 * some configurable parameters */
99 /* max number of clients allowed */
101 #define MAXCLIENTS (FD_SETSIZE - 10)
103 #define MAXCLIENTS 1024
106 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
107 #define DEFAULT_NXACTS 10 /* default nxacts */
109 int nxacts = 0; /* number of transactions per client */
110 int duration = 0; /* duration in seconds */
113 * scaling factor. for example, scale = 10 will make 1000000 tuples in
114 * pgbench_accounts table.
119 * fillfactor. for example, fillfactor = 90 will use only 90 percent
120 * space during inserts and leave 10 percent free.
122 int fillfactor = 100;
125 * create foreign key constraints on the tables?
127 int foreign_keys = 0;
130 * use unlogged tables?
132 int unlogged_tables = 0;
135 * log sampling rate (1.0 = log everything, 0.0 = option not given)
137 double sample_rate = 0.0;
140 * tablespace selection
142 char *tablespace = NULL;
143 char *index_tablespace = NULL;
146 * end of configurable parameters
147 *********************************************************************/
149 #define nbranches 1 /* Makes little sense to change this. Change
152 #define naccounts 100000
155 * The scale factor at/beyond which 32bit integers are incapable of storing
158 * Although the actual threshold is 21474, we use 20000 because it is easier to
159 * document and remember, and isn't that far away from the real threshold.
161 #define SCALE_32BIT_THRESHOLD 20000
163 bool use_log; /* log transaction latencies to a file */
164 bool use_quiet; /* quiet logging onto stderr */
165 int agg_interval; /* log aggregates instead of individual
167 bool is_connect; /* establish connection for each transaction */
168 bool is_latencies; /* report per-command latencies */
169 int main_pid; /* main process id used in log filename */
175 const char *progname;
177 volatile bool timer_exceeded = false; /* flag from signal handler */
179 /* variable definitions */
182 char *name; /* variable name */
183 char *value; /* its value */
186 #define MAX_FILES 128 /* max number of SQL script files allowed */
187 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
190 * structures used in custom query mode
195 PGconn *con; /* connection handle to DB */
196 int id; /* client No. */
197 int state; /* state No. */
198 int cnt; /* xacts count */
199 int ecnt; /* error count */
200 int listen; /* 0 indicates that an async query has been
202 int sleeping; /* 1 indicates that the client is napping */
203 int64 until; /* napping until (usec) */
204 Variable *variables; /* array of variable definitions */
206 instr_time txn_begin; /* used for measuring transaction latencies */
207 instr_time stmt_begin; /* used for measuring statement latencies */
208 int use_file; /* index in sql_files for this client */
209 bool prepared[MAX_FILES];
213 * Thread state and result
217 int tid; /* thread id */
218 pthread_t thread; /* thread handle */
219 CState *state; /* array of CState */
220 int nstate; /* length of state[] */
221 instr_time start_time; /* thread start time */
222 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
223 int *exec_count; /* number of cmd executions (per Command) */
224 unsigned short random_state[3]; /* separate randomness for each thread */
227 #define INVALID_THREAD ((pthread_t) 0)
231 instr_time conn_time;
236 * queries read from files
238 #define SQL_COMMAND 1
239 #define META_COMMAND 2
242 typedef enum QueryMode
244 QUERY_SIMPLE, /* simple query */
245 QUERY_EXTENDED, /* extended query */
246 QUERY_PREPARED, /* extended query with prepared statements */
250 static QueryMode querymode = QUERY_SIMPLE;
251 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
255 char *line; /* full text of command line */
256 int command_num; /* unique index of this Command struct */
257 int type; /* command type (SQL_COMMAND or META_COMMAND) */
258 int argc; /* number of command words */
259 char *argv[MAX_ARGS]; /* command word list */
265 long start_time; /* when does the interval start */
266 int cnt; /* number of transactions */
267 double min_duration; /* min/max durations */
269 double sum; /* sum(duration), sum(duration^2) - for
275 static Command **sql_files[MAX_FILES]; /* SQL script files */
276 static int num_files; /* number of script files */
277 static int num_commands = 0; /* total number of Command structs */
278 static int debug = 0; /* debug flag */
280 /* default scenario */
281 static char *tpc_b = {
282 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
283 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
284 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
285 "\\setrandom aid 1 :naccounts\n"
286 "\\setrandom bid 1 :nbranches\n"
287 "\\setrandom tid 1 :ntellers\n"
288 "\\setrandom delta -5000 5000\n"
290 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
291 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
292 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
293 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
294 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
299 static char *simple_update = {
300 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
301 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
302 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
303 "\\setrandom aid 1 :naccounts\n"
304 "\\setrandom bid 1 :nbranches\n"
305 "\\setrandom tid 1 :ntellers\n"
306 "\\setrandom delta -5000 5000\n"
308 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
309 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
310 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
315 static char *select_only = {
316 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
317 "\\setrandom aid 1 :naccounts\n"
318 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
321 /* Function prototypes */
322 static void setalarm(int seconds);
323 static void *threadRun(void *arg);
328 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
330 " %s [OPTION]... [DBNAME]\n"
331 "\nInitialization options:\n"
332 " -i, --initialize invokes initialization mode\n"
333 " -F, --fillfactor=NUM set fill factor\n"
334 " -n, --no-vacuum do not run VACUUM after initialization\n"
335 " -q, --quiet quiet logging (one message each 5 seconds)\n"
336 " -s, --scale=NUM scaling factor\n"
337 " --foreign-keys create foreign key constraints between tables\n"
338 " --index-tablespace=TABLESPACE\n"
339 " create indexes in the specified tablespace\n"
340 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
341 " --unlogged-tables create tables as unlogged tables\n"
342 "\nBenchmarking options:\n"
343 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
344 " -C, --connect establish new connection for each transaction\n"
345 " -D, --define=VARNAME=VALUE\n"
346 " define variable for use by custom script\n"
347 " -f, --file=FILENAME read transaction script from FILENAME\n"
348 " -j, --jobs=NUM number of threads (default: 1)\n"
349 " -l, --log write transaction times to log file\n"
350 " -M, --protocol=simple|extended|prepared\n"
351 " protocol for submitting queries "
352 "(default: simple)\n"
353 " -n, --no-vacuum do not run VACUUM before tests\n"
354 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
355 " -r, --report-latencies report average latency per command\n"
356 " -s, --scale=NUM report this scale factor in output\n"
357 " -S, --select-only perform SELECT-only transactions\n"
358 " -t, --transactions number of transactions each client runs "
360 " -T, --time=NUM duration of benchmark test in seconds\n"
361 " -v, --vacuum-all vacuum all four standard tables before tests\n"
362 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
363 " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
364 "\nCommon options:\n"
365 " -d, --debug print debugging output\n"
366 " -h, --host=HOSTNAME database server host or socket directory\n"
367 " -p, --port=PORT database server port number\n"
368 " -U, --username=USERNAME connect as specified database user\n"
369 " -V, --version output version information, then exit\n"
370 " -?, --help show this help, then exit\n"
372 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
377 * strtoint64 -- convert a string to 64-bit integer
379 * This function is a modified version of scanint8() from
380 * src/backend/utils/adt/int8.c.
383 strtoint64(const char *str)
385 const char *ptr = str;
390 * Do our own scan, rather than relying on sscanf which might be broken
394 /* skip leading spaces */
395 while (*ptr && isspace((unsigned char) *ptr))
404 * Do an explicit check for INT64_MIN. Ugly though this is, it's
405 * cleaner than trying to get the loop below to handle it portably.
407 if (strncmp(ptr, "9223372036854775808", 19) == 0)
409 result = -INT64CONST(0x7fffffffffffffff) - 1;
415 else if (*ptr == '+')
418 /* require at least one digit */
419 if (!isdigit((unsigned char) *ptr))
420 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
423 while (*ptr && isdigit((unsigned char) *ptr))
425 int64 tmp = result * 10 + (*ptr++ - '0');
427 if ((tmp / 10) != result) /* overflow? */
428 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
434 /* allow trailing whitespace, but not other trailing chars */
435 while (*ptr != '\0' && isspace((unsigned char) *ptr))
439 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
441 return ((sign < 0) ? -result : result);
444 /* random number generator: uniform distribution from min to max inclusive */
446 getrand(TState *thread, int64 min, int64 max)
449 * Odd coding is so that min and max have approximately the same chance of
450 * being selected as do numbers between them.
452 * pg_erand48() is thread-safe and concurrent, which is why we use it
453 * rather than random(), which in glibc is non-reentrant, and therefore
454 * protected by a mutex, and therefore a bottleneck on machines with many
457 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
460 /* call PQexec() and exit() on failure */
462 executeStatement(PGconn *con, const char *sql)
466 res = PQexec(con, sql);
467 if (PQresultStatus(res) != PGRES_COMMAND_OK)
469 fprintf(stderr, "%s", PQerrorMessage(con));
475 /* set up a connection to the backend */
480 static char *password = NULL;
484 * Start the connection. Loop until we have a password if requested by
489 #define PARAMS_ARRAY_SIZE 7
491 const char *keywords[PARAMS_ARRAY_SIZE];
492 const char *values[PARAMS_ARRAY_SIZE];
494 keywords[0] = "host";
496 keywords[1] = "port";
498 keywords[2] = "user";
500 keywords[3] = "password";
501 values[3] = password;
502 keywords[4] = "dbname";
504 keywords[5] = "fallback_application_name";
505 values[5] = progname;
511 conn = PQconnectdbParams(keywords, values, true);
515 fprintf(stderr, "Connection to database \"%s\" failed\n",
520 if (PQstatus(conn) == CONNECTION_BAD &&
521 PQconnectionNeedsPassword(conn) &&
525 password = simple_prompt("Password: ", 100, false);
530 /* check to see that the backend connection was successfully made */
531 if (PQstatus(conn) == CONNECTION_BAD)
533 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
534 dbName, PQerrorMessage(conn));
542 /* throw away response from backend */
544 discard_response(CState *state)
550 res = PQgetResult(state->con);
557 compareVariables(const void *v1, const void *v2)
559 return strcmp(((const Variable *) v1)->name,
560 ((const Variable *) v2)->name);
564 getVariable(CState *st, char *name)
569 /* On some versions of Solaris, bsearch of zero items dumps core */
570 if (st->nvariables <= 0)
574 var = (Variable *) bsearch((void *) &key,
575 (void *) st->variables,
585 /* check whether the name consists of alphabets, numerals and underscores. */
587 isLegalVariableName(const char *name)
591 for (i = 0; name[i] != '\0'; i++)
593 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
601 putVariable(CState *st, const char *context, char *name, char *value)
607 /* On some versions of Solaris, bsearch of zero items dumps core */
608 if (st->nvariables > 0)
609 var = (Variable *) bsearch((void *) &key,
610 (void *) st->variables,
622 * Check for the name only when declaring a new variable to avoid
625 if (!isLegalVariableName(name))
627 fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
632 newvars = (Variable *) pg_realloc(st->variables,
633 (st->nvariables + 1) * sizeof(Variable));
635 newvars = (Variable *) pg_malloc(sizeof(Variable));
637 st->variables = newvars;
639 var = &newvars[st->nvariables];
641 var->name = pg_strdup(name);
642 var->value = pg_strdup(value);
646 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
653 /* dup then free, in case value is pointing at this variable */
654 val = pg_strdup(value);
664 parseVariable(const char *sql, int *eaten)
672 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
677 memcpy(name, &sql[1], i - 1);
685 replaceVariable(char **sql, char *param, int len, char *value)
687 int valueln = strlen(value);
691 size_t offset = param - *sql;
693 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
694 param = *sql + offset;
698 memmove(param + valueln, param + len, strlen(param + len) + 1);
699 strncpy(param, value, valueln);
701 return param + valueln;
705 assignVariables(CState *st, char *sql)
712 while ((p = strchr(p, ':')) != NULL)
716 name = parseVariable(p, &eaten);
726 val = getVariable(st, name);
734 p = replaceVariable(&sql, p, eaten, val);
741 getQueryParams(CState *st, const Command *command, const char **params)
745 for (i = 0; i < command->argc - 1; i++)
746 params[i] = getVariable(st, command->argv[i + 1]);
750 * Run a shell command. The result is assigned to the variable if not NULL.
751 * Return true if succeeded, or false on error.
754 runShellCommand(CState *st, char *variable, char **argv, int argc)
756 char command[SHELL_COMMAND_SIZE];
765 * Join arguments with whitespace separators. Arguments starting with
766 * exactly one colon are treated as variables:
767 * name - append a string "name"
768 * :var - append a variable named 'var'
769 * ::name - append a string ":name"
772 for (i = 0; i < argc; i++)
777 if (argv[i][0] != ':')
779 arg = argv[i]; /* a string literal */
781 else if (argv[i][1] == ':')
783 arg = argv[i] + 1; /* a string literal starting with colons */
785 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
787 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
791 arglen = strlen(arg);
792 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
794 fprintf(stderr, "%s: too long shell command\n", argv[0]);
799 command[len++] = ' ';
800 memcpy(command + len, arg, arglen);
806 /* Fast path for non-assignment case */
807 if (variable == NULL)
812 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
818 /* Execute the command with pipe and read the standard output. */
819 if ((fp = popen(command, "r")) == NULL)
821 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
824 if (fgets(res, sizeof(res), fp) == NULL)
827 fprintf(stderr, "%s: cannot read the result\n", argv[0]);
832 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
836 /* Check whether the result is an integer and assign it to the variable */
837 retval = (int) strtol(res, &endptr, 10);
838 while (*endptr != '\0' && isspace((unsigned char) *endptr))
840 if (*res == '\0' || *endptr != '\0')
842 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
845 snprintf(res, sizeof(res), "%d", retval);
846 if (!putVariable(st, "setshell", variable, res))
850 printf("shell parameter name: %s, value: %s\n", argv[1], res);
855 #define MAX_PREPARE_NAME 32
857 preparedStatementName(char *buffer, int file, int state)
859 sprintf(buffer, "P%d_%d", file, state);
863 clientDone(CState *st, bool ok)
865 (void) ok; /* unused */
872 return false; /* always false */
877 agg_vals_init(AggVals *aggs, instr_time start)
880 aggs->cnt = 0; /* number of transactions */
881 aggs->sum = 0; /* SUM(duration) */
882 aggs->sum2 = 0; /* SUM(duration*duration) */
884 /* min and max transaction duration */
885 aggs->min_duration = 0;
886 aggs->max_duration = 0;
888 /* start of the current interval */
889 aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
892 /* return false iff client should be disconnected */
894 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
900 commands = sql_files[st->use_file];
903 { /* are we sleeping? */
906 INSTR_TIME_SET_CURRENT(now);
907 if (st->until <= INSTR_TIME_GET_MICROSEC(now))
908 st->sleeping = 0; /* Done sleeping, go ahead with next command */
910 return true; /* Still sleeping, nothing to do here */
914 { /* are we receiver? */
915 if (commands[st->state]->type == SQL_COMMAND)
918 fprintf(stderr, "client %d receiving\n", st->id);
919 if (!PQconsumeInput(st->con))
920 { /* there's something wrong */
921 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
922 return clientDone(st, false);
924 if (PQisBusy(st->con))
925 return true; /* don't have the whole result yet */
929 * command finished: accumulate per-command execution times in
930 * thread-local data structure, if per-command latencies are requested
935 int cnum = commands[st->state]->command_num;
937 INSTR_TIME_SET_CURRENT(now);
938 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
939 now, st->stmt_begin);
940 thread->exec_count[cnum]++;
944 * if transaction finished, record the time it took in the log
946 if (logfile && commands[st->state + 1] == NULL)
953 * write the log entry if this row belongs to the random sample,
954 * or no sampling rate was given which means log everything.
956 if (sample_rate == 0.0 ||
957 pg_erand48(thread->random_state) <= sample_rate)
959 INSTR_TIME_SET_CURRENT(now);
961 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
962 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
964 /* should we aggregate the results or not? */
965 if (agg_interval > 0)
968 * are we still in the same interval? if yes, accumulate
969 * the values (print them otherwise)
971 if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
975 agg->sum2 += usec * usec;
977 /* first in this aggregation interval */
978 if ((agg->cnt == 1) || (usec < agg->min_duration))
979 agg->min_duration = usec;
981 if ((agg->cnt == 1) || (usec > agg->max_duration))
982 agg->max_duration = usec;
987 * Loop until we reach the interval of the current
988 * transaction (and print all the empty intervals in
991 while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
994 * This is a non-Windows branch (thanks to the
995 * ifdef in usage), so we don't need to handle
996 * this in a special way (see below).
998 fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
1006 /* move to the next inteval */
1007 agg->start_time = agg->start_time + agg_interval;
1009 /* reset for "no transaction" intervals */
1011 agg->min_duration = 0;
1012 agg->max_duration = 0;
1018 * and now update the reset values (include the
1022 agg->min_duration = usec;
1023 agg->max_duration = usec;
1025 agg->sum2 = usec * usec;
1030 /* no, print raw transactions */
1034 * This is more than we really ought to know about
1037 fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
1038 st->id, st->cnt, usec, st->use_file,
1039 (long) now.tv_sec, (long) now.tv_usec);
1043 * On Windows, instr_time doesn't provide a timestamp
1046 fprintf(logfile, "%d %d %.0f %d 0 0\n",
1047 st->id, st->cnt, usec, st->use_file);
1053 if (commands[st->state]->type == SQL_COMMAND)
1056 * Read and discard the query result; note this is not included in
1057 * the statement latency numbers.
1059 res = PQgetResult(st->con);
1060 switch (PQresultStatus(res))
1062 case PGRES_COMMAND_OK:
1063 case PGRES_TUPLES_OK:
1066 fprintf(stderr, "Client %d aborted in state %d: %s",
1067 st->id, st->state, PQerrorMessage(st->con));
1069 return clientDone(st, false);
1072 discard_response(st);
1075 if (commands[st->state + 1] == NULL)
1084 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1085 return clientDone(st, true); /* exit success */
1088 /* increment state counter */
1090 if (commands[st->state] == NULL)
1093 st->use_file = (int) getrand(thread, 0, num_files - 1);
1094 commands = sql_files[st->use_file];
1098 if (st->con == NULL)
1103 INSTR_TIME_SET_CURRENT(start);
1104 if ((st->con = doConnect()) == NULL)
1106 fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1107 return clientDone(st, false);
1109 INSTR_TIME_SET_CURRENT(end);
1110 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1113 /* Record transaction start time if logging is enabled */
1114 if (logfile && st->state == 0)
1115 INSTR_TIME_SET_CURRENT(st->txn_begin);
1117 /* Record statement start time if per-command latencies are requested */
1119 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1121 if (commands[st->state]->type == SQL_COMMAND)
1123 const Command *command = commands[st->state];
1126 if (querymode == QUERY_SIMPLE)
1130 sql = pg_strdup(command->argv[0]);
1131 sql = assignVariables(st, sql);
1134 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1135 r = PQsendQuery(st->con, sql);
1138 else if (querymode == QUERY_EXTENDED)
1140 const char *sql = command->argv[0];
1141 const char *params[MAX_ARGS];
1143 getQueryParams(st, command, params);
1146 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1147 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1148 NULL, params, NULL, NULL, 0);
1150 else if (querymode == QUERY_PREPARED)
1152 char name[MAX_PREPARE_NAME];
1153 const char *params[MAX_ARGS];
1155 if (!st->prepared[st->use_file])
1159 for (j = 0; commands[j] != NULL; j++)
1162 char name[MAX_PREPARE_NAME];
1164 if (commands[j]->type != SQL_COMMAND)
1166 preparedStatementName(name, st->use_file, j);
1167 res = PQprepare(st->con, name,
1168 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1169 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1170 fprintf(stderr, "%s", PQerrorMessage(st->con));
1173 st->prepared[st->use_file] = true;
1176 getQueryParams(st, command, params);
1177 preparedStatementName(name, st->use_file, st->state);
1180 fprintf(stderr, "client %d sending %s\n", st->id, name);
1181 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1182 params, NULL, NULL, 0);
1184 else /* unknown sql mode */
1190 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1194 st->listen = 1; /* flags that should be listened */
1196 else if (commands[st->state]->type == META_COMMAND)
1198 int argc = commands[st->state]->argc,
1200 char **argv = commands[st->state]->argv;
1204 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1205 for (i = 1; i < argc; i++)
1206 fprintf(stderr, " %s", argv[i]);
1207 fprintf(stderr, "\n");
1210 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1217 if (*argv[2] == ':')
1219 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1221 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1225 min = strtoint64(var);
1228 min = strtoint64(argv[2]);
1233 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1239 if (*argv[3] == ':')
1241 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1243 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1247 max = strtoint64(var);
1250 max = strtoint64(argv[3]);
1254 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1260 * getrand() needs to be able to subtract max from min and add one
1261 * to the result without overflowing. Since we know max > min, we
1262 * can detect overflow just by checking for a negative result. But
1263 * we must check both that the subtraction doesn't overflow, and
1264 * that adding one to the result doesn't overflow either.
1266 if (max - min < 0 || (max - min) + 1 < 0)
1268 fprintf(stderr, "%s: range too large\n", argv[0]);
1274 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1276 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1278 if (!putVariable(st, argv[0], argv[1], res))
1286 else if (pg_strcasecmp(argv[0], "set") == 0)
1293 if (*argv[2] == ':')
1295 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1297 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1301 ope1 = strtoint64(var);
1304 ope1 = strtoint64(argv[2]);
1307 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
1310 if (*argv[4] == ':')
1312 if ((var = getVariable(st, argv[4] + 1)) == NULL)
1314 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
1318 ope2 = strtoint64(var);
1321 ope2 = strtoint64(argv[4]);
1323 if (strcmp(argv[3], "+") == 0)
1324 snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
1325 else if (strcmp(argv[3], "-") == 0)
1326 snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
1327 else if (strcmp(argv[3], "*") == 0)
1328 snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
1329 else if (strcmp(argv[3], "/") == 0)
1333 fprintf(stderr, "%s: division by zero\n", argv[0]);
1337 snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
1341 fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
1347 if (!putVariable(st, argv[0], argv[1], res))
1355 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1361 if (*argv[1] == ':')
1363 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1365 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1372 usec = atoi(argv[1]);
1376 if (pg_strcasecmp(argv[2], "ms") == 0)
1378 else if (pg_strcasecmp(argv[2], "s") == 0)
1384 INSTR_TIME_SET_CURRENT(now);
1385 st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
1390 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1392 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1394 if (timer_exceeded) /* timeout */
1395 return clientDone(st, true);
1396 else if (!ret) /* on error */
1401 else /* succeeded */
1404 else if (pg_strcasecmp(argv[0], "shell") == 0)
1406 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1408 if (timer_exceeded) /* timeout */
1409 return clientDone(st, true);
1410 else if (!ret) /* on error */
1415 else /* succeeded */
1424 /* discard connections */
1426 disconnect_all(CState *state, int length)
1430 for (i = 0; i < length; i++)
1434 PQfinish(state[i].con);
1435 state[i].con = NULL;
1440 /* create tables and setup data */
1442 init(bool is_no_vacuum)
1444 /* The scale factor at/beyond which 32bit integers are incapable of storing
1447 * Although the actual threshold is 21474, we use 20000 because it is easier to
1448 * document and remember, and isn't that far away from the real threshold.
1450 #define SCALE_32BIT_THRESHOLD 20000
1453 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1454 * fields in these table declarations were intended to comply with that.
1455 * But because they default to NULLs, they don't actually take any space.
1456 * We could fix that by giving them non-null default values. However, that
1457 * would completely break comparability of pgbench results with prior
1458 * versions. Since pgbench has never pretended to be fully TPC-B
1459 * compliant anyway, we stick with the historical behavior.
1465 int declare_fillfactor;
1467 struct ddlinfo DDLs[] = {
1470 scale >= SCALE_32BIT_THRESHOLD
1471 ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
1472 : "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1477 "tid int not null,bid int,tbalance int,filler char(84)",
1482 scale >= SCALE_32BIT_THRESHOLD
1483 ? "aid bigint not null,bid int,abalance int,filler char(84)"
1484 : "aid int not null,bid int,abalance int,filler char(84)",
1489 "bid int not null,bbalance int,filler char(88)",
1493 static char *DDLAFTERs[] = {
1494 "alter table pgbench_branches add primary key (bid)",
1495 "alter table pgbench_tellers add primary key (tid)",
1496 "alter table pgbench_accounts add primary key (aid)"
1498 static char *DDLKEYs[] = {
1499 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1500 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1501 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1502 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1503 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1512 /* used to track elapsed time and estimate of the remaining time */
1517 int log_interval = 1;
1519 if ((con = doConnect()) == NULL)
1522 for (i = 0; i < lengthof(DDLs); i++)
1526 struct ddlinfo *ddl = &DDLs[i];
1528 /* Remove old table, if it exists. */
1529 snprintf(buffer, 256, "drop table if exists %s", ddl->table);
1530 executeStatement(con, buffer);
1532 /* Construct new create table statement. */
1534 if (ddl->declare_fillfactor)
1535 snprintf(opts + strlen(opts), 256 - strlen(opts),
1536 " with (fillfactor=%d)", fillfactor);
1537 if (tablespace != NULL)
1539 char *escape_tablespace;
1541 escape_tablespace = PQescapeIdentifier(con, tablespace,
1542 strlen(tablespace));
1543 snprintf(opts + strlen(opts), 256 - strlen(opts),
1544 " tablespace %s", escape_tablespace);
1545 PQfreemem(escape_tablespace);
1547 snprintf(buffer, 256, "create%s table %s(%s)%s",
1548 unlogged_tables ? " unlogged" : "",
1549 ddl->table, ddl->cols, opts);
1551 executeStatement(con, buffer);
1554 executeStatement(con, "begin");
1556 for (i = 0; i < nbranches * scale; i++)
1558 snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
1559 executeStatement(con, sql);
1562 for (i = 0; i < ntellers * scale; i++)
1564 snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
1565 i + 1, i / ntellers + 1);
1566 executeStatement(con, sql);
1569 executeStatement(con, "commit");
1572 * fill the pgbench_accounts table with some data
1574 fprintf(stderr, "creating tables...\n");
1576 executeStatement(con, "begin");
1577 executeStatement(con, "truncate pgbench_accounts");
1579 res = PQexec(con, "copy pgbench_accounts from stdin");
1580 if (PQresultStatus(res) != PGRES_COPY_IN)
1582 fprintf(stderr, "%s", PQerrorMessage(con));
1587 INSTR_TIME_SET_CURRENT(start);
1589 for (k = 0; k < (int64) naccounts * scale; k++)
1593 snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
1594 if (PQputline(con, sql))
1596 fprintf(stderr, "PQputline failed\n");
1601 * If we want to stick with the original logging, print a message each
1602 * 100k inserted rows.
1604 if ((!use_quiet) && (j % 100000 == 0))
1606 INSTR_TIME_SET_CURRENT(diff);
1607 INSTR_TIME_SUBTRACT(diff, start);
1609 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1610 remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1612 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1613 j, (int64) naccounts * scale,
1614 (int) (((int64) j * 100) / (naccounts * scale)),
1615 elapsed_sec, remaining_sec);
1617 /* let's not call the timing for each row, but only each 100 rows */
1618 else if (use_quiet && (j % 100 == 0))
1620 INSTR_TIME_SET_CURRENT(diff);
1621 INSTR_TIME_SUBTRACT(diff, start);
1623 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1624 remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1626 /* have we reached the next interval (or end)? */
1627 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
1629 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1630 j, (int64) naccounts * scale,
1631 (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);
1633 /* skip to the next interval */
1634 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
1639 if (PQputline(con, "\\.\n"))
1641 fprintf(stderr, "very last PQputline failed\n");
1646 fprintf(stderr, "PQendcopy failed\n");
1649 executeStatement(con, "commit");
1654 fprintf(stderr, "vacuum...\n");
1655 executeStatement(con, "vacuum analyze pgbench_branches");
1656 executeStatement(con, "vacuum analyze pgbench_tellers");
1657 executeStatement(con, "vacuum analyze pgbench_accounts");
1658 executeStatement(con, "vacuum analyze pgbench_history");
1664 fprintf(stderr, "set primary keys...\n");
1665 for (i = 0; i < lengthof(DDLAFTERs); i++)
1669 strncpy(buffer, DDLAFTERs[i], 256);
1671 if (index_tablespace != NULL)
1673 char *escape_tablespace;
1675 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
1676 strlen(index_tablespace));
1677 snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
1678 " using index tablespace %s", escape_tablespace);
1679 PQfreemem(escape_tablespace);
1682 executeStatement(con, buffer);
1686 * create foreign keys
1690 fprintf(stderr, "set foreign keys...\n");
1691 for (i = 0; i < lengthof(DDLKEYs); i++)
1693 executeStatement(con, DDLKEYs[i]);
1698 fprintf(stderr, "done.\n");
1703 * Parse the raw sql and replace :param to $n.
1706 parseQuery(Command *cmd, const char *raw_sql)
1711 sql = pg_strdup(raw_sql);
1715 while ((p = strchr(p, ':')) != NULL)
1721 name = parseVariable(p, &eaten);
1731 if (cmd->argc >= MAX_ARGS)
1733 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
1737 sprintf(var, "$%d", cmd->argc);
1738 p = replaceVariable(&sql, p, eaten, var);
1740 cmd->argv[cmd->argc] = name;
1748 /* Parse a command; return a Command struct, or NULL if it's a comment */
1750 process_commands(char *buf)
1752 const char delim[] = " \f\n\r\t\v";
1754 Command *my_commands;
1759 /* Make the string buf end at the next newline */
1760 if ((p = strchr(buf, '\n')) != NULL)
1763 /* Skip leading whitespace */
1765 while (isspace((unsigned char) *p))
1768 /* If the line is empty or actually a comment, we're done */
1769 if (*p == '\0' || strncmp(p, "--", 2) == 0)
1772 /* Allocate and initialize Command structure */
1773 my_commands = (Command *) pg_malloc(sizeof(Command));
1774 my_commands->line = pg_strdup(buf);
1775 my_commands->command_num = num_commands++;
1776 my_commands->type = 0; /* until set */
1777 my_commands->argc = 0;
1781 my_commands->type = META_COMMAND;
1784 tok = strtok(++p, delim);
1788 my_commands->argv[j++] = pg_strdup(tok);
1789 my_commands->argc++;
1790 tok = strtok(NULL, delim);
1793 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
1795 if (my_commands->argc < 4)
1797 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1801 for (j = 4; j < my_commands->argc; j++)
1802 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1803 my_commands->argv[0], my_commands->argv[j]);
1805 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
1807 if (my_commands->argc < 3)
1809 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1813 for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
1814 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1815 my_commands->argv[0], my_commands->argv[j]);
1817 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
1819 if (my_commands->argc < 2)
1821 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1826 * Split argument into number and unit to allow "sleep 1ms" etc.
1827 * We don't have to terminate the number argument with null
1828 * because it will be parsed with atoi, which ignores trailing
1829 * non-digit characters.
1831 if (my_commands->argv[1][0] != ':')
1833 char *c = my_commands->argv[1];
1835 while (isdigit((unsigned char) *c))
1839 my_commands->argv[2] = c;
1840 if (my_commands->argc < 3)
1841 my_commands->argc = 3;
1845 if (my_commands->argc >= 3)
1847 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
1848 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
1849 pg_strcasecmp(my_commands->argv[2], "s") != 0)
1851 fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
1852 my_commands->argv[0], my_commands->argv[2]);
1857 for (j = 3; j < my_commands->argc; j++)
1858 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1859 my_commands->argv[0], my_commands->argv[j]);
1861 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
1863 if (my_commands->argc < 3)
1865 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1869 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
1871 if (my_commands->argc < 1)
1873 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
1879 fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
1885 my_commands->type = SQL_COMMAND;
1890 my_commands->argv[0] = pg_strdup(p);
1891 my_commands->argc++;
1893 case QUERY_EXTENDED:
1894 case QUERY_PREPARED:
1895 if (!parseQuery(my_commands, p))
1907 process_file(char *filename)
1909 #define COMMANDS_ALLOC_NUM 128
1911 Command **my_commands;
1917 if (num_files >= MAX_FILES)
1919 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
1923 alloc_num = COMMANDS_ALLOC_NUM;
1924 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1926 if (strcmp(filename, "-") == 0)
1928 else if ((fd = fopen(filename, "r")) == NULL)
1930 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
1936 while (fgets(buf, sizeof(buf), fd) != NULL)
1940 command = process_commands(buf);
1941 if (command == NULL)
1944 my_commands[lineno] = command;
1947 if (lineno >= alloc_num)
1949 alloc_num += COMMANDS_ALLOC_NUM;
1950 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
1955 my_commands[lineno] = NULL;
1957 sql_files[num_files++] = my_commands;
1963 process_builtin(char *tb)
1965 #define COMMANDS_ALLOC_NUM 128
1967 Command **my_commands;
1972 alloc_num = COMMANDS_ALLOC_NUM;
1973 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1983 while (*tb && *tb != '\n')
1994 command = process_commands(buf);
1995 if (command == NULL)
1998 my_commands[lineno] = command;
2001 if (lineno >= alloc_num)
2003 alloc_num += COMMANDS_ALLOC_NUM;
2004 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
2008 my_commands[lineno] = NULL;
2013 /* print out results */
2015 printResults(int ttype, int normal_xacts, int nclients,
2016 TState *threads, int nthreads,
2017 instr_time total_time, instr_time conn_total_time)
2019 double time_include,
2024 time_include = INSTR_TIME_GET_DOUBLE(total_time);
2025 tps_include = normal_xacts / time_include;
2026 tps_exclude = normal_xacts / (time_include -
2027 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
2030 s = "TPC-B (sort of)";
2031 else if (ttype == 2)
2032 s = "Update only pgbench_accounts";
2033 else if (ttype == 1)
2038 printf("transaction type: %s\n", s);
2039 printf("scaling factor: %d\n", scale);
2040 printf("query mode: %s\n", QUERYMODE[querymode]);
2041 printf("number of clients: %d\n", nclients);
2042 printf("number of threads: %d\n", nthreads);
2045 printf("number of transactions per client: %d\n", nxacts);
2046 printf("number of transactions actually processed: %d/%d\n",
2047 normal_xacts, nxacts * nclients);
2051 printf("duration: %d s\n", duration);
2052 printf("number of transactions actually processed: %d\n",
2055 printf("tps = %f (including connections establishing)\n", tps_include);
2056 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2058 /* Report per-command latencies */
2063 for (i = 0; i < num_files; i++)
2068 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2070 printf("statement latencies in milliseconds:\n");
2072 for (commands = sql_files[i]; *commands != NULL; commands++)
2074 Command *command = *commands;
2075 int cnum = command->command_num;
2077 instr_time total_exec_elapsed;
2078 int total_exec_count;
2081 /* Accumulate per-thread data for command */
2082 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2083 total_exec_count = 0;
2084 for (t = 0; t < nthreads; t++)
2086 TState *thread = &threads[t];
2088 INSTR_TIME_ADD(total_exec_elapsed,
2089 thread->exec_elapsed[cnum]);
2090 total_exec_count += thread->exec_count[cnum];
2093 if (total_exec_count > 0)
2094 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2098 printf("\t%f\t%s\n", total_time, command->line);
2106 main(int argc, char **argv)
2108 static struct option long_options[] = {
2109 /* systematic long/short named options*/
2110 {"client", required_argument, NULL, 'c'},
2111 {"connect", no_argument, NULL, 'C'},
2112 {"debug", no_argument, NULL, 'd'},
2113 {"define", required_argument, NULL, 'D'},
2114 {"file", required_argument, NULL, 'f'},
2115 {"fillfactor", required_argument, NULL, 'F'},
2116 {"host", required_argument, NULL, 'h'},
2117 {"initialize", no_argument, NULL, 'i'},
2118 {"jobs", required_argument, NULL, 'j'},
2119 {"log", no_argument, NULL, 'l'},
2120 {"no-vacuum", no_argument, NULL, 'n'},
2121 {"port", required_argument, NULL, 'p'},
2122 {"protocol", required_argument, NULL, 'M'},
2123 {"quiet", no_argument, NULL, 'q'},
2124 {"report-latencies", no_argument, NULL, 'r'},
2125 {"scale", required_argument, NULL, 's'},
2126 {"select-only", no_argument, NULL, 'S'},
2127 {"skip-some-updates", no_argument, NULL, 'N'},
2128 {"time", required_argument, NULL, 'T'},
2129 {"transactions", required_argument, NULL, 't'},
2130 {"username", required_argument, NULL, 'U'},
2131 {"vacuum-all", no_argument, NULL, 'v'},
2132 /* long-named only options */
2133 {"foreign-keys", no_argument, &foreign_keys, 1},
2134 {"index-tablespace", required_argument, NULL, 3},
2135 {"tablespace", required_argument, NULL, 2},
2136 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2137 {"sampling-rate", required_argument, NULL, 4},
2138 {"aggregate-interval", required_argument, NULL, 5},
2143 int nclients = 1; /* default number of simulated clients */
2144 int nthreads = 1; /* default number of threads */
2145 int is_init_mode = 0; /* initialize mode? */
2146 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2147 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2148 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2149 * 2: skip update of branches and tellers */
2151 char *filename = NULL;
2152 bool scale_given = false;
2154 CState *state; /* status of clients */
2155 TState *threads; /* array of thread */
2157 instr_time start_time; /* start up time */
2158 instr_time total_time;
2159 instr_time conn_total_time;
2164 #ifdef HAVE_GETRLIMIT
2174 progname = get_progname(argv[0]);
2178 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2183 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2185 puts("pgbench (PostgreSQL) " PG_VERSION);
2191 /* stderr is buffered on Win32. */
2192 setvbuf(stderr, NULL, _IONBF, 0);
2195 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2197 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2199 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2202 state = (CState *) pg_malloc(sizeof(CState));
2203 memset(state, 0, sizeof(CState));
2205 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
2213 pghost = pg_strdup(optarg);
2219 do_vacuum_accounts++;
2222 pgport = pg_strdup(optarg);
2234 nclients = atoi(optarg);
2235 if (nclients <= 0 || nclients > MAXCLIENTS)
2237 fprintf(stderr, "invalid number of clients: %d\n", nclients);
2240 #ifdef HAVE_GETRLIMIT
2241 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2242 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2243 #else /* but BSD doesn't ... */
2244 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2245 #endif /* RLIMIT_NOFILE */
2247 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2250 if (rlim.rlim_cur <= (nclients + 2))
2252 fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2253 fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2256 #endif /* HAVE_GETRLIMIT */
2258 case 'j': /* jobs */
2259 nthreads = atoi(optarg);
2262 fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2270 is_latencies = true;
2274 scale = atoi(optarg);
2277 fprintf(stderr, "invalid scaling factor: %d\n", scale);
2284 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2287 nxacts = atoi(optarg);
2290 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2297 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2300 duration = atoi(optarg);
2303 fprintf(stderr, "invalid duration: %d\n", duration);
2308 login = pg_strdup(optarg);
2318 filename = pg_strdup(optarg);
2319 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2326 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2328 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2333 if (!putVariable(&state[0], "option", optarg, p))
2338 fillfactor = atoi(optarg);
2339 if ((fillfactor < 10) || (fillfactor > 100))
2341 fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2348 fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
2351 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2352 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2354 if (querymode >= NUM_QUERYMODE)
2356 fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
2361 /* This covers long options which take no argument. */
2363 case 2: /* tablespace */
2364 tablespace = pg_strdup(optarg);
2366 case 3: /* index-tablespace */
2367 index_tablespace = pg_strdup(optarg);
2370 sample_rate = atof(optarg);
2371 if (sample_rate <= 0.0 || sample_rate > 1.0)
2373 fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
2379 fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
2382 agg_interval = atoi(optarg);
2383 if (agg_interval <= 0)
2385 fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
2391 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
2398 dbName = argv[optind];
2401 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
2403 else if (login != NULL && *login != '\0')
2415 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
2416 if (nxacts <= 0 && duration <= 0)
2417 nxacts = DEFAULT_NXACTS;
2419 if (nclients % nthreads != 0)
2421 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
2425 /* --sampling-rate may be used only with -l */
2426 if (sample_rate > 0.0 && !use_log)
2428 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
2432 /* -q may be used only with -i */
2433 if (use_quiet && !is_init_mode)
2435 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
2439 /* --sampling-rate may must not be used with --aggregate-interval */
2440 if (sample_rate > 0.0 && agg_interval > 0)
2442 fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
2446 if (agg_interval > 0 && (!use_log))
2448 fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
2452 if ((duration > 0) && (agg_interval > duration))
2454 fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
2458 if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
2460 fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
2465 * is_latencies only works with multiple threads in thread-based
2466 * implementations, not fork-based ones, because it supposes that the
2467 * parent can see changes made to the per-thread execution stats by child
2468 * threads. It seems useful enough to accept despite this limitation, but
2469 * perhaps we should FIXME someday (by passing the stats data back up
2470 * through the parent-to-child pipes).
2472 #ifndef ENABLE_THREAD_SAFETY
2473 if (is_latencies && nthreads > 1)
2475 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
2481 * save main process id in the global variable because process id will be
2482 * changed after fork.
2484 main_pid = (int) getpid();
2488 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
2489 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
2491 /* copy any -D switch values to all clients */
2492 for (i = 1; i < nclients; i++)
2497 for (j = 0; j < state[0].nvariables; j++)
2499 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
2508 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
2509 pghost, pgport, nclients, nxacts, dbName);
2511 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
2512 pghost, pgport, nclients, duration, dbName);
2515 /* opening connection... */
2520 if (PQstatus(con) == CONNECTION_BAD)
2522 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
2523 fprintf(stderr, "%s", PQerrorMessage(con));
2530 * get the scaling factor that should be same as count(*) from
2531 * pgbench_branches if this is not a custom query
2533 res = PQexec(con, "select count(*) from pgbench_branches");
2534 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2536 fprintf(stderr, "%s", PQerrorMessage(con));
2539 scale = atoi(PQgetvalue(res, 0, 0));
2542 fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
2547 /* warn if we override user-given -s switch */
2550 "Scale option ignored, using pgbench_branches table count = %d\n",
2555 * :scale variables normally get -s or database scale, but don't override
2556 * an explicit -D switch
2558 if (getVariable(&state[0], "scale") == NULL)
2560 snprintf(val, sizeof(val), "%d", scale);
2561 for (i = 0; i < nclients; i++)
2563 if (!putVariable(&state[i], "startup", "scale", val))
2569 * Define a :client_id variable that is unique per connection. But don't
2570 * override an explicit -D switch.
2572 if (getVariable(&state[0], "client_id") == NULL)
2574 for (i = 0; i < nclients; i++)
2576 snprintf(val, sizeof(val), "%d", i);
2577 if (!putVariable(&state[i], "startup", "client_id", val))
2584 fprintf(stderr, "starting vacuum...");
2585 executeStatement(con, "vacuum pgbench_branches");
2586 executeStatement(con, "vacuum pgbench_tellers");
2587 executeStatement(con, "truncate pgbench_history");
2588 fprintf(stderr, "end.\n");
2590 if (do_vacuum_accounts)
2592 fprintf(stderr, "starting vacuum pgbench_accounts...");
2593 executeStatement(con, "vacuum analyze pgbench_accounts");
2594 fprintf(stderr, "end.\n");
2599 /* set random seed */
2600 INSTR_TIME_SET_CURRENT(start_time);
2601 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
2603 /* process builtin SQL scripts */
2607 sql_files[0] = process_builtin(tpc_b);
2612 sql_files[0] = process_builtin(select_only);
2617 sql_files[0] = process_builtin(simple_update);
2625 /* set up thread data structures */
2626 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
2627 for (i = 0; i < nthreads; i++)
2629 TState *thread = &threads[i];
2632 thread->state = &state[nclients / nthreads * i];
2633 thread->nstate = nclients / nthreads;
2634 thread->random_state[0] = random();
2635 thread->random_state[1] = random();
2636 thread->random_state[2] = random();
2640 /* Reserve memory for the thread to store per-command latencies */
2643 thread->exec_elapsed = (instr_time *)
2644 pg_malloc(sizeof(instr_time) * num_commands);
2645 thread->exec_count = (int *)
2646 pg_malloc(sizeof(int) * num_commands);
2648 for (t = 0; t < num_commands; t++)
2650 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
2651 thread->exec_count[t] = 0;
2656 thread->exec_elapsed = NULL;
2657 thread->exec_count = NULL;
2661 /* get start up time */
2662 INSTR_TIME_SET_CURRENT(start_time);
2664 /* set alarm if duration is specified. */
2669 for (i = 0; i < nthreads; i++)
2671 TState *thread = &threads[i];
2673 INSTR_TIME_SET_CURRENT(thread->start_time);
2675 /* the first thread (i = 0) is executed by main thread */
2678 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
2680 if (err != 0 || thread->thread == INVALID_THREAD)
2682 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
2688 thread->thread = INVALID_THREAD;
2692 /* wait for threads and accumulate results */
2694 INSTR_TIME_SET_ZERO(conn_total_time);
2695 for (i = 0; i < nthreads; i++)
2699 if (threads[i].thread == INVALID_THREAD)
2700 ret = threadRun(&threads[i]);
2702 pthread_join(threads[i].thread, &ret);
2706 TResult *r = (TResult *) ret;
2708 total_xacts += r->xacts;
2709 INSTR_TIME_ADD(conn_total_time, r->conn_time);
2713 disconnect_all(state, nclients);
2716 INSTR_TIME_SET_CURRENT(total_time);
2717 INSTR_TIME_SUBTRACT(total_time, start_time);
2718 printResults(ttype, total_xacts, nclients, threads, nthreads,
2719 total_time, conn_total_time);
2725 threadRun(void *arg)
2727 TState *thread = (TState *) arg;
2728 CState *state = thread->state;
2730 FILE *logfile = NULL; /* per-thread log file */
2733 int nstate = thread->nstate;
2734 int remains = nstate; /* number of remaining clients */
2739 result = pg_malloc(sizeof(TResult));
2741 INSTR_TIME_SET_ZERO(result->conn_time);
2743 /* open log file if requested */
2748 if (thread->tid == 0)
2749 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
2751 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
2752 logfile = fopen(logpath, "w");
2754 if (logfile == NULL)
2756 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
2763 /* make connections to the database */
2764 for (i = 0; i < nstate; i++)
2766 if ((state[i].con = doConnect()) == NULL)
2771 /* time after thread and connections set up */
2772 INSTR_TIME_SET_CURRENT(result->conn_time);
2773 INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
2775 agg_vals_init(&aggs, thread->start_time);
2777 /* send start up queries in async manner */
2778 for (i = 0; i < nstate; i++)
2780 CState *st = &state[i];
2781 Command **commands = sql_files[st->use_file];
2782 int prev_ecnt = st->ecnt;
2784 st->use_file = getrand(thread, 0, num_files - 1);
2785 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
2786 remains--; /* I've aborted */
2788 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2790 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
2791 remains--; /* I've aborted */
2800 int maxsock; /* max socket number to be waited */
2804 FD_ZERO(&input_mask);
2807 min_usec = INT64_MAX;
2808 for (i = 0; i < nstate; i++)
2810 CState *st = &state[i];
2811 Command **commands = sql_files[st->use_file];
2818 if (min_usec == INT64_MAX)
2822 INSTR_TIME_SET_CURRENT(now);
2823 now_usec = INSTR_TIME_GET_MICROSEC(now);
2826 this_usec = st->until - now_usec;
2827 if (min_usec > this_usec)
2828 min_usec = this_usec;
2830 else if (st->con == NULL)
2834 else if (commands[st->state]->type == META_COMMAND)
2836 min_usec = 0; /* the connection is ready to run */
2840 sock = PQsocket(st->con);
2843 fprintf(stderr, "bad socket: %s\n", strerror(errno));
2847 FD_SET(sock, &input_mask);
2853 if (min_usec > 0 && maxsock != -1)
2855 int nsocks; /* return from select(2) */
2857 if (min_usec != INT64_MAX)
2859 struct timeval timeout;
2861 timeout.tv_sec = min_usec / 1000000;
2862 timeout.tv_usec = min_usec % 1000000;
2863 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
2866 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
2871 /* must be something wrong */
2872 fprintf(stderr, "select failed: %s\n", strerror(errno));
2877 /* ok, backend returns reply */
2878 for (i = 0; i < nstate; i++)
2880 CState *st = &state[i];
2881 Command **commands = sql_files[st->use_file];
2882 int prev_ecnt = st->ecnt;
2884 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
2885 || commands[st->state]->type == META_COMMAND))
2887 if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
2888 remains--; /* I've aborted */
2891 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2893 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
2894 remains--; /* I've aborted */
2902 INSTR_TIME_SET_CURRENT(start);
2903 disconnect_all(state, nstate);
2905 for (i = 0; i < nstate; i++)
2906 result->xacts += state[i].cnt;
2907 INSTR_TIME_SET_CURRENT(end);
2908 INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
2915 * Support for duration option: set timer_exceeded after so many seconds.
2921 handle_sig_alarm(SIGNAL_ARGS)
2923 timer_exceeded = true;
2927 setalarm(int seconds)
2929 pqsignal(SIGALRM, handle_sig_alarm);
2933 #ifndef ENABLE_THREAD_SAFETY
2936 * implements pthread using fork.
2939 typedef struct fork_pthread
2946 pthread_create(pthread_t *thread,
2947 pthread_attr_t *attr,
2948 void *(*start_routine) (void *),
2954 th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
2955 if (pipe(th->pipes) < 0)
2962 if (th->pid == -1) /* error */
2967 if (th->pid != 0) /* in parent process */
2969 close(th->pipes[1]);
2974 /* in child process */
2975 close(th->pipes[0]);
2977 /* set alarm again because the child does not inherit timers */
2981 ret = start_routine(arg);
2982 write(th->pipes[1], ret, sizeof(TResult));
2983 close(th->pipes[1]);
2989 pthread_join(pthread_t th, void **thread_return)
2993 while (waitpid(th->pid, &status, 0) != th->pid)
2999 if (thread_return != NULL)
3001 /* assume result is TResult */
3002 *thread_return = pg_malloc(sizeof(TResult));
3003 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
3005 free(*thread_return);
3006 *thread_return = NULL;
3009 close(th->pipes[0]);
3017 static VOID CALLBACK
3018 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
3020 timer_exceeded = true;
3024 setalarm(int seconds)
3029 /* This function will be called at most once, so we can cheat a bit. */
3030 queue = CreateTimerQueue();
3031 if (seconds > ((DWORD) -1) / 1000 ||
3032 !CreateTimerQueueTimer(&timer, queue,
3033 win32_timer_callback, NULL, seconds * 1000, 0,
3034 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
3036 fprintf(stderr, "Failed to set timer\n");
3041 /* partial pthread implementation for Windows */
3043 typedef struct win32_pthread
3046 void *(*routine) (void *);
3051 static unsigned __stdcall
3052 win32_pthread_run(void *arg)
3054 win32_pthread *th = (win32_pthread *) arg;
3056 th->result = th->routine(th->arg);
3062 pthread_create(pthread_t *thread,
3063 pthread_attr_t *attr,
3064 void *(*start_routine) (void *),
3070 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
3071 th->routine = start_routine;
3075 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
3076 if (th->handle == NULL)
3088 pthread_join(pthread_t th, void **thread_return)
3090 if (th == NULL || th->handle == NULL)
3091 return errno = EINVAL;
3093 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
3095 _dosmaperr(GetLastError());
3100 *thread_return = th->result;
3102 CloseHandle(th->handle);