4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * contrib/pgbench/pgbench.c
8 * Copyright (c) 2000-2013, PostgreSQL Global Development Group
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 #define FD_SETSIZE 1024 /* set before winsock2.h is included */
34 #include "postgres_fe.h"
36 #include "getopt_long.h"
38 #include "libpq/pqsignal.h"
39 #include "portability/instr_time.h"
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
53 #ifdef HAVE_SYS_RESOURCE_H
54 #include <sys/resource.h> /* for getrlimit */
58 #define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF)
62 * Multi-platform pthread implementations
66 /* Use native win32 threads on Windows */
67 typedef struct win32_pthread *pthread_t;
68 typedef int pthread_attr_t;
70 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
71 static int pthread_join(pthread_t th, void **thread_return);
72 #elif defined(ENABLE_THREAD_SAFETY)
73 /* Use platform-dependent pthread capability */
76 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
80 #define pthread_t pg_pthread_t
81 #define pthread_attr_t pg_pthread_attr_t
82 #define pthread_create pg_pthread_create
83 #define pthread_join pg_pthread_join
85 typedef struct fork_pthread *pthread_t;
86 typedef int pthread_attr_t;
88 static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
89 static int pthread_join(pthread_t th, void **thread_return);
96 /********************************************************************
97 * some configurable parameters */
99 /* max number of clients allowed */
101 #define MAXCLIENTS (FD_SETSIZE - 10)
103 #define MAXCLIENTS 1024
106 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
107 #define DEFAULT_NXACTS 10 /* default nxacts */
109 int nxacts = 0; /* number of transactions per client */
110 int duration = 0; /* duration in seconds */
113 * scaling factor. for example, scale = 10 will make 1000000 tuples in
114 * pgbench_accounts table.
119 * fillfactor. for example, fillfactor = 90 will use only 90 percent
120 * space during inserts and leave 10 percent free.
122 int fillfactor = 100;
125 * create foreign key constraints on the tables?
127 int foreign_keys = 0;
130 * use unlogged tables?
132 int unlogged_tables = 0;
135 * log sampling rate (1.0 = log everything, 0.0 = option not given)
137 double sample_rate = 0.0;
140 * tablespace selection
142 char *tablespace = NULL;
143 char *index_tablespace = NULL;
146 * end of configurable parameters
147 *********************************************************************/
149 #define nbranches 1 /* Makes little sense to change this. Change
152 #define naccounts 100000
155 * The scale factor at/beyond which 32bit integers are incapable of storing
158 * Although the actual threshold is 21474, we use 20000 because it is easier to
159 * document and remember, and isn't that far away from the real threshold.
161 #define SCALE_32BIT_THRESHOLD 20000
163 bool use_log; /* log transaction latencies to a file */
164 bool use_quiet; /* quiet logging onto stderr */
165 bool is_connect; /* establish connection for each transaction */
166 bool is_latencies; /* report per-command latencies */
167 int main_pid; /* main process id used in log filename */
173 const char *progname;
175 volatile bool timer_exceeded = false; /* flag from signal handler */
177 /* variable definitions */
180 char *name; /* variable name */
181 char *value; /* its value */
184 #define MAX_FILES 128 /* max number of SQL script files allowed */
185 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
188 * structures used in custom query mode
193 PGconn *con; /* connection handle to DB */
194 int id; /* client No. */
195 int state; /* state No. */
196 int cnt; /* xacts count */
197 int ecnt; /* error count */
198 int listen; /* 0 indicates that an async query has been
200 int sleeping; /* 1 indicates that the client is napping */
201 int64 until; /* napping until (usec) */
202 Variable *variables; /* array of variable definitions */
204 instr_time txn_begin; /* used for measuring transaction latencies */
205 instr_time stmt_begin; /* used for measuring statement latencies */
206 int use_file; /* index in sql_files for this client */
207 bool prepared[MAX_FILES];
211 * Thread state and result
215 int tid; /* thread id */
216 pthread_t thread; /* thread handle */
217 CState *state; /* array of CState */
218 int nstate; /* length of state[] */
219 instr_time start_time; /* thread start time */
220 instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
221 int *exec_count; /* number of cmd executions (per Command) */
222 unsigned short random_state[3]; /* separate randomness for each thread */
225 #define INVALID_THREAD ((pthread_t) 0)
229 instr_time conn_time;
234 * queries read from files
236 #define SQL_COMMAND 1
237 #define META_COMMAND 2
240 typedef enum QueryMode
242 QUERY_SIMPLE, /* simple query */
243 QUERY_EXTENDED, /* extended query */
244 QUERY_PREPARED, /* extended query with prepared statements */
248 static QueryMode querymode = QUERY_SIMPLE;
249 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
253 char *line; /* full text of command line */
254 int command_num; /* unique index of this Command struct */
255 int type; /* command type (SQL_COMMAND or META_COMMAND) */
256 int argc; /* number of command words */
257 char *argv[MAX_ARGS]; /* command word list */
260 static Command **sql_files[MAX_FILES]; /* SQL script files */
261 static int num_files; /* number of script files */
262 static int num_commands = 0; /* total number of Command structs */
263 static int debug = 0; /* debug flag */
265 /* default scenario */
266 static char *tpc_b = {
267 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
268 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
269 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
270 "\\setrandom aid 1 :naccounts\n"
271 "\\setrandom bid 1 :nbranches\n"
272 "\\setrandom tid 1 :ntellers\n"
273 "\\setrandom delta -5000 5000\n"
275 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
276 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
277 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
278 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
279 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
284 static char *simple_update = {
285 "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
286 "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
287 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
288 "\\setrandom aid 1 :naccounts\n"
289 "\\setrandom bid 1 :nbranches\n"
290 "\\setrandom tid 1 :ntellers\n"
291 "\\setrandom delta -5000 5000\n"
293 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
294 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
295 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
300 static char *select_only = {
301 "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
302 "\\setrandom aid 1 :naccounts\n"
303 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
306 /* Function prototypes */
307 static void setalarm(int seconds);
308 static void *threadRun(void *arg);
312 * routines to check mem allocations and fail noisily.
315 pg_malloc(size_t size)
319 /* Avoid unportable behavior of malloc(0) */
322 result = malloc(size);
325 fprintf(stderr, "out of memory\n");
332 pg_realloc(void *ptr, size_t size)
336 /* Avoid unportable behavior of realloc(NULL, 0) */
337 if (ptr == NULL && size == 0)
339 result = realloc(ptr, size);
342 fprintf(stderr, "out of memory\n");
349 pg_strdup(const char *s)
356 fprintf(stderr, "out of memory\n");
366 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
368 " %s [OPTION]... [DBNAME]\n"
369 "\nInitialization options:\n"
370 " -i invokes initialization mode\n"
371 " -n do not run VACUUM after initialization\n"
372 " -F NUM fill factor\n"
373 " -s NUM scaling factor\n"
374 " -q quiet logging (one message each 5 seconds)\n"
376 " create foreign key constraints between tables\n"
377 " --index-tablespace=TABLESPACE\n"
378 " create indexes in the specified tablespace\n"
379 " --tablespace=TABLESPACE\n"
380 " create tables in the specified tablespace\n"
381 " --unlogged-tables\n"
382 " create tables as unlogged tables\n"
383 "\nBenchmarking options:\n"
384 " -c NUM number of concurrent database clients (default: 1)\n"
385 " -C establish new connection for each transaction\n"
386 " -D VARNAME=VALUE\n"
387 " define variable for use by custom script\n"
388 " -f FILENAME read transaction script from FILENAME\n"
389 " -j NUM number of threads (default: 1)\n"
390 " -l write transaction times to log file\n"
391 " --sampling-rate NUM\n"
392 " fraction of transactions to log (e.g. 0.01 for 1%% sample)\n"
393 " -M simple|extended|prepared\n"
394 " protocol for submitting queries to server (default: simple)\n"
395 " -n do not run VACUUM before tests\n"
396 " -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
397 " -r report average latency per command\n"
398 " -s NUM report this scale factor in output\n"
399 " -S perform SELECT-only transactions\n"
400 " -t NUM number of transactions each client runs (default: 10)\n"
401 " -T NUM duration of benchmark test in seconds\n"
402 " -v vacuum all four standard tables before tests\n"
403 "\nCommon options:\n"
404 " -d print debugging output\n"
405 " -h HOSTNAME database server host or socket directory\n"
406 " -p PORT database server port number\n"
407 " -U USERNAME connect as specified database user\n"
408 " -V, --version output version information, then exit\n"
409 " -?, --help show this help, then exit\n"
411 "Report bugs to <pgsql-bugs@postgresql.org>.\n",
416 * strtoint64 -- convert a string to 64-bit integer
418 * This function is a modified version of scanint8() from
419 * src/backend/utils/adt/int8.c.
422 strtoint64(const char *str)
424 const char *ptr = str;
429 * Do our own scan, rather than relying on sscanf which might be broken
433 /* skip leading spaces */
434 while (*ptr && isspace((unsigned char) *ptr))
443 * Do an explicit check for INT64_MIN. Ugly though this is, it's
444 * cleaner than trying to get the loop below to handle it portably.
446 if (strncmp(ptr, "9223372036854775808", 19) == 0)
448 result = -INT64CONST(0x7fffffffffffffff) - 1;
454 else if (*ptr == '+')
457 /* require at least one digit */
458 if (!isdigit((unsigned char) *ptr))
459 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
462 while (*ptr && isdigit((unsigned char) *ptr))
464 int64 tmp = result * 10 + (*ptr++ - '0');
466 if ((tmp / 10) != result) /* overflow? */
467 fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
473 /* allow trailing whitespace, but not other trailing chars */
474 while (*ptr != '\0' && isspace((unsigned char) *ptr))
478 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
480 return ((sign < 0) ? -result : result);
483 /* random number generator: uniform distribution from min to max inclusive */
485 getrand(TState *thread, int64 min, int64 max)
488 * Odd coding is so that min and max have approximately the same chance of
489 * being selected as do numbers between them.
491 * pg_erand48() is thread-safe and concurrent, which is why we use it
492 * rather than random(), which in glibc is non-reentrant, and therefore
493 * protected by a mutex, and therefore a bottleneck on machines with many
496 return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
499 /* call PQexec() and exit() on failure */
501 executeStatement(PGconn *con, const char *sql)
505 res = PQexec(con, sql);
506 if (PQresultStatus(res) != PGRES_COMMAND_OK)
508 fprintf(stderr, "%s", PQerrorMessage(con));
514 /* set up a connection to the backend */
519 static char *password = NULL;
523 * Start the connection. Loop until we have a password if requested by
528 #define PARAMS_ARRAY_SIZE 7
530 const char *keywords[PARAMS_ARRAY_SIZE];
531 const char *values[PARAMS_ARRAY_SIZE];
533 keywords[0] = "host";
535 keywords[1] = "port";
537 keywords[2] = "user";
539 keywords[3] = "password";
540 values[3] = password;
541 keywords[4] = "dbname";
543 keywords[5] = "fallback_application_name";
544 values[5] = progname;
550 conn = PQconnectdbParams(keywords, values, true);
554 fprintf(stderr, "Connection to database \"%s\" failed\n",
559 if (PQstatus(conn) == CONNECTION_BAD &&
560 PQconnectionNeedsPassword(conn) &&
564 password = simple_prompt("Password: ", 100, false);
569 /* check to see that the backend connection was successfully made */
570 if (PQstatus(conn) == CONNECTION_BAD)
572 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
573 dbName, PQerrorMessage(conn));
581 /* throw away response from backend */
583 discard_response(CState *state)
589 res = PQgetResult(state->con);
596 compareVariables(const void *v1, const void *v2)
598 return strcmp(((const Variable *) v1)->name,
599 ((const Variable *) v2)->name);
603 getVariable(CState *st, char *name)
608 /* On some versions of Solaris, bsearch of zero items dumps core */
609 if (st->nvariables <= 0)
613 var = (Variable *) bsearch((void *) &key,
614 (void *) st->variables,
624 /* check whether the name consists of alphabets, numerals and underscores. */
626 isLegalVariableName(const char *name)
630 for (i = 0; name[i] != '\0'; i++)
632 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
640 putVariable(CState *st, const char *context, char *name, char *value)
646 /* On some versions of Solaris, bsearch of zero items dumps core */
647 if (st->nvariables > 0)
648 var = (Variable *) bsearch((void *) &key,
649 (void *) st->variables,
661 * Check for the name only when declaring a new variable to avoid
664 if (!isLegalVariableName(name))
666 fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
671 newvars = (Variable *) pg_realloc(st->variables,
672 (st->nvariables + 1) * sizeof(Variable));
674 newvars = (Variable *) pg_malloc(sizeof(Variable));
676 st->variables = newvars;
678 var = &newvars[st->nvariables];
680 var->name = pg_strdup(name);
681 var->value = pg_strdup(value);
685 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
692 /* dup then free, in case value is pointing at this variable */
693 val = pg_strdup(value);
703 parseVariable(const char *sql, int *eaten)
711 } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
716 memcpy(name, &sql[1], i - 1);
724 replaceVariable(char **sql, char *param, int len, char *value)
726 int valueln = strlen(value);
730 size_t offset = param - *sql;
732 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
733 param = *sql + offset;
737 memmove(param + valueln, param + len, strlen(param + len) + 1);
738 strncpy(param, value, valueln);
740 return param + valueln;
744 assignVariables(CState *st, char *sql)
751 while ((p = strchr(p, ':')) != NULL)
755 name = parseVariable(p, &eaten);
765 val = getVariable(st, name);
773 p = replaceVariable(&sql, p, eaten, val);
780 getQueryParams(CState *st, const Command *command, const char **params)
784 for (i = 0; i < command->argc - 1; i++)
785 params[i] = getVariable(st, command->argv[i + 1]);
789 * Run a shell command. The result is assigned to the variable if not NULL.
790 * Return true if succeeded, or false on error.
793 runShellCommand(CState *st, char *variable, char **argv, int argc)
795 char command[SHELL_COMMAND_SIZE];
804 * Join arguments with whitespace separators. Arguments starting with
805 * exactly one colon are treated as variables:
806 * name - append a string "name"
807 * :var - append a variable named 'var'
808 * ::name - append a string ":name"
811 for (i = 0; i < argc; i++)
816 if (argv[i][0] != ':')
818 arg = argv[i]; /* a string literal */
820 else if (argv[i][1] == ':')
822 arg = argv[i] + 1; /* a string literal starting with colons */
824 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
826 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
830 arglen = strlen(arg);
831 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
833 fprintf(stderr, "%s: too long shell command\n", argv[0]);
838 command[len++] = ' ';
839 memcpy(command + len, arg, arglen);
845 /* Fast path for non-assignment case */
846 if (variable == NULL)
851 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
857 /* Execute the command with pipe and read the standard output. */
858 if ((fp = popen(command, "r")) == NULL)
860 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
863 if (fgets(res, sizeof(res), fp) == NULL)
866 fprintf(stderr, "%s: cannot read the result\n", argv[0]);
871 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
875 /* Check whether the result is an integer and assign it to the variable */
876 retval = (int) strtol(res, &endptr, 10);
877 while (*endptr != '\0' && isspace((unsigned char) *endptr))
879 if (*res == '\0' || *endptr != '\0')
881 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
884 snprintf(res, sizeof(res), "%d", retval);
885 if (!putVariable(st, "setshell", variable, res))
889 printf("shell parameter name: %s, value: %s\n", argv[1], res);
894 #define MAX_PREPARE_NAME 32
896 preparedStatementName(char *buffer, int file, int state)
898 sprintf(buffer, "P%d_%d", file, state);
902 clientDone(CState *st, bool ok)
904 (void) ok; /* unused */
911 return false; /* always false */
914 /* return false iff client should be disconnected */
916 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile)
922 commands = sql_files[st->use_file];
925 { /* are we sleeping? */
928 INSTR_TIME_SET_CURRENT(now);
929 if (st->until <= INSTR_TIME_GET_MICROSEC(now))
930 st->sleeping = 0; /* Done sleeping, go ahead with next command */
932 return true; /* Still sleeping, nothing to do here */
936 { /* are we receiver? */
937 if (commands[st->state]->type == SQL_COMMAND)
940 fprintf(stderr, "client %d receiving\n", st->id);
941 if (!PQconsumeInput(st->con))
942 { /* there's something wrong */
943 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
944 return clientDone(st, false);
946 if (PQisBusy(st->con))
947 return true; /* don't have the whole result yet */
951 * command finished: accumulate per-command execution times in
952 * thread-local data structure, if per-command latencies are requested
957 int cnum = commands[st->state]->command_num;
959 INSTR_TIME_SET_CURRENT(now);
960 INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
961 now, st->stmt_begin);
962 thread->exec_count[cnum]++;
966 * if transaction finished, record the time it took in the log
968 if (logfile && commands[st->state + 1] == NULL)
975 * write the log entry if this row belongs to the random sample,
976 * or no sampling rate was given which means log everything.
978 if (sample_rate == 0.0 ||
979 pg_erand48(thread->random_state) <= sample_rate)
982 INSTR_TIME_SET_CURRENT(now);
984 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
985 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
988 /* This is more than we really ought to know about instr_time */
989 fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
990 st->id, st->cnt, usec, st->use_file,
991 (long) now.tv_sec, (long) now.tv_usec);
993 /* On Windows, instr_time doesn't provide a timestamp anyway */
994 fprintf(logfile, "%d %d %.0f %d 0 0\n",
995 st->id, st->cnt, usec, st->use_file);
1000 if (commands[st->state]->type == SQL_COMMAND)
1003 * Read and discard the query result; note this is not included in
1004 * the statement latency numbers.
1006 res = PQgetResult(st->con);
1007 switch (PQresultStatus(res))
1009 case PGRES_COMMAND_OK:
1010 case PGRES_TUPLES_OK:
1013 fprintf(stderr, "Client %d aborted in state %d: %s",
1014 st->id, st->state, PQerrorMessage(st->con));
1016 return clientDone(st, false);
1019 discard_response(st);
1022 if (commands[st->state + 1] == NULL)
1031 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1032 return clientDone(st, true); /* exit success */
1035 /* increment state counter */
1037 if (commands[st->state] == NULL)
1040 st->use_file = (int) getrand(thread, 0, num_files - 1);
1041 commands = sql_files[st->use_file];
1045 if (st->con == NULL)
1050 INSTR_TIME_SET_CURRENT(start);
1051 if ((st->con = doConnect()) == NULL)
1053 fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1054 return clientDone(st, false);
1056 INSTR_TIME_SET_CURRENT(end);
1057 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1060 /* Record transaction start time if logging is enabled */
1061 if (logfile && st->state == 0)
1062 INSTR_TIME_SET_CURRENT(st->txn_begin);
1064 /* Record statement start time if per-command latencies are requested */
1066 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1068 if (commands[st->state]->type == SQL_COMMAND)
1070 const Command *command = commands[st->state];
1073 if (querymode == QUERY_SIMPLE)
1077 sql = pg_strdup(command->argv[0]);
1078 sql = assignVariables(st, sql);
1081 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1082 r = PQsendQuery(st->con, sql);
1085 else if (querymode == QUERY_EXTENDED)
1087 const char *sql = command->argv[0];
1088 const char *params[MAX_ARGS];
1090 getQueryParams(st, command, params);
1093 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1094 r = PQsendQueryParams(st->con, sql, command->argc - 1,
1095 NULL, params, NULL, NULL, 0);
1097 else if (querymode == QUERY_PREPARED)
1099 char name[MAX_PREPARE_NAME];
1100 const char *params[MAX_ARGS];
1102 if (!st->prepared[st->use_file])
1106 for (j = 0; commands[j] != NULL; j++)
1109 char name[MAX_PREPARE_NAME];
1111 if (commands[j]->type != SQL_COMMAND)
1113 preparedStatementName(name, st->use_file, j);
1114 res = PQprepare(st->con, name,
1115 commands[j]->argv[0], commands[j]->argc - 1, NULL);
1116 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1117 fprintf(stderr, "%s", PQerrorMessage(st->con));
1120 st->prepared[st->use_file] = true;
1123 getQueryParams(st, command, params);
1124 preparedStatementName(name, st->use_file, st->state);
1127 fprintf(stderr, "client %d sending %s\n", st->id, name);
1128 r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1129 params, NULL, NULL, 0);
1131 else /* unknown sql mode */
1137 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1141 st->listen = 1; /* flags that should be listened */
1143 else if (commands[st->state]->type == META_COMMAND)
1145 int argc = commands[st->state]->argc,
1147 char **argv = commands[st->state]->argv;
1151 fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1152 for (i = 1; i < argc; i++)
1153 fprintf(stderr, " %s", argv[i]);
1154 fprintf(stderr, "\n");
1157 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1164 if (*argv[2] == ':')
1166 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1168 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1172 min = strtoint64(var);
1175 min = strtoint64(argv[2]);
1180 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1186 if (*argv[3] == ':')
1188 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1190 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1194 max = strtoint64(var);
1197 max = strtoint64(argv[3]);
1201 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1207 * getrand() needs to be able to subtract max from min and add
1208 * one to the result without overflowing. Since we know max > min,
1209 * we can detect overflow just by checking for a negative result.
1210 * But we must check both that the subtraction doesn't overflow,
1211 * and that adding one to the result doesn't overflow either.
1213 if (max - min < 0 || (max - min) + 1 < 0)
1215 fprintf(stderr, "%s: range too large\n", argv[0]);
1221 printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1223 snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1225 if (!putVariable(st, argv[0], argv[1], res))
1233 else if (pg_strcasecmp(argv[0], "set") == 0)
1240 if (*argv[2] == ':')
1242 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1244 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1248 ope1 = strtoint64(var);
1251 ope1 = strtoint64(argv[2]);
1254 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
1257 if (*argv[4] == ':')
1259 if ((var = getVariable(st, argv[4] + 1)) == NULL)
1261 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
1265 ope2 = strtoint64(var);
1268 ope2 = strtoint64(argv[4]);
1270 if (strcmp(argv[3], "+") == 0)
1271 snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
1272 else if (strcmp(argv[3], "-") == 0)
1273 snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
1274 else if (strcmp(argv[3], "*") == 0)
1275 snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
1276 else if (strcmp(argv[3], "/") == 0)
1280 fprintf(stderr, "%s: division by zero\n", argv[0]);
1284 snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
1288 fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
1294 if (!putVariable(st, argv[0], argv[1], res))
1302 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1308 if (*argv[1] == ':')
1310 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1312 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1319 usec = atoi(argv[1]);
1323 if (pg_strcasecmp(argv[2], "ms") == 0)
1325 else if (pg_strcasecmp(argv[2], "s") == 0)
1331 INSTR_TIME_SET_CURRENT(now);
1332 st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
1337 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1339 bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1341 if (timer_exceeded) /* timeout */
1342 return clientDone(st, true);
1343 else if (!ret) /* on error */
1348 else /* succeeded */
1351 else if (pg_strcasecmp(argv[0], "shell") == 0)
1353 bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1355 if (timer_exceeded) /* timeout */
1356 return clientDone(st, true);
1357 else if (!ret) /* on error */
1362 else /* succeeded */
1371 /* discard connections */
1373 disconnect_all(CState *state, int length)
1377 for (i = 0; i < length; i++)
1381 PQfinish(state[i].con);
1382 state[i].con = NULL;
1387 /* create tables and setup data */
1389 init(bool is_no_vacuum)
1392 /* The scale factor at/beyond which 32bit integers are incapable of storing
1395 * Although the actual threshold is 21474, we use 20000 because it is easier to
1396 * document and remember, and isn't that far away from the real threshold.
1398 #define SCALE_32BIT_THRESHOLD 20000
1401 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1402 * fields in these table declarations were intended to comply with that.
1403 * But because they default to NULLs, they don't actually take any space.
1404 * We could fix that by giving them non-null default values. However, that
1405 * would completely break comparability of pgbench results with prior
1406 * versions. Since pgbench has never pretended to be fully TPC-B
1407 * compliant anyway, we stick with the historical behavior.
1413 int declare_fillfactor;
1415 struct ddlinfo DDLs[] = {
1418 scale >= SCALE_32BIT_THRESHOLD
1419 ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
1420 : "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
1425 "tid int not null,bid int,tbalance int,filler char(84)",
1430 scale >= SCALE_32BIT_THRESHOLD
1431 ? "aid bigint not null,bid int,abalance int,filler char(84)"
1432 : "aid int not null,bid int,abalance int,filler char(84)",
1437 "bid int not null,bbalance int,filler char(88)",
1441 static char *DDLAFTERs[] = {
1442 "alter table pgbench_branches add primary key (bid)",
1443 "alter table pgbench_tellers add primary key (tid)",
1444 "alter table pgbench_accounts add primary key (aid)"
1446 static char *DDLKEYs[] = {
1447 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1448 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1449 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1450 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1451 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1460 /* used to track elapsed time and estimate of the remaining time */
1461 instr_time start, diff;
1462 double elapsed_sec, remaining_sec;
1463 int log_interval = 1;
1465 if ((con = doConnect()) == NULL)
1468 for (i = 0; i < lengthof(DDLs); i++)
1472 struct ddlinfo *ddl = &DDLs[i];
1474 /* Remove old table, if it exists. */
1475 snprintf(buffer, 256, "drop table if exists %s", ddl->table);
1476 executeStatement(con, buffer);
1478 /* Construct new create table statement. */
1480 if (ddl->declare_fillfactor)
1481 snprintf(opts + strlen(opts), 256 - strlen(opts),
1482 " with (fillfactor=%d)", fillfactor);
1483 if (tablespace != NULL)
1485 char *escape_tablespace;
1487 escape_tablespace = PQescapeIdentifier(con, tablespace,
1488 strlen(tablespace));
1489 snprintf(opts + strlen(opts), 256 - strlen(opts),
1490 " tablespace %s", escape_tablespace);
1491 PQfreemem(escape_tablespace);
1493 snprintf(buffer, 256, "create%s table %s(%s)%s",
1494 unlogged_tables ? " unlogged" : "",
1495 ddl->table, ddl->cols, opts);
1497 executeStatement(con, buffer);
1500 executeStatement(con, "begin");
1502 for (i = 0; i < nbranches * scale; i++)
1504 snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
1505 executeStatement(con, sql);
1508 for (i = 0; i < ntellers * scale; i++)
1510 snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
1511 i + 1, i / ntellers + 1);
1512 executeStatement(con, sql);
1515 executeStatement(con, "commit");
1518 * fill the pgbench_accounts table with some data
1520 fprintf(stderr, "creating tables...\n");
1522 executeStatement(con, "begin");
1523 executeStatement(con, "truncate pgbench_accounts");
1525 res = PQexec(con, "copy pgbench_accounts from stdin");
1526 if (PQresultStatus(res) != PGRES_COPY_IN)
1528 fprintf(stderr, "%s", PQerrorMessage(con));
1533 INSTR_TIME_SET_CURRENT(start);
1535 for (k = 0; k < (int64) naccounts * scale; k++)
1539 snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
1540 if (PQputline(con, sql))
1542 fprintf(stderr, "PQputline failed\n");
1546 /* If we want to stick with the original logging, print a message each
1547 * 100k inserted rows. */
1548 if ((! use_quiet) && (j % 100000 == 0))
1550 INSTR_TIME_SET_CURRENT(diff);
1551 INSTR_TIME_SUBTRACT(diff, start);
1553 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1554 remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1556 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1557 j, (int64)naccounts * scale,
1558 (int) (((int64) j * 100) / (naccounts * scale)),
1559 elapsed_sec, remaining_sec);
1561 /* let's not call the timing for each row, but only each 100 rows */
1562 else if (use_quiet && (j % 100 == 0))
1564 INSTR_TIME_SET_CURRENT(diff);
1565 INSTR_TIME_SUBTRACT(diff, start);
1567 elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1568 remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1570 /* have we reached the next interval (or end)? */
1571 if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) {
1573 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1574 j, (int64)naccounts * scale,
1575 (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);
1577 /* skip to the next interval */
1578 log_interval = (int)ceil(elapsed_sec/LOG_STEP_SECONDS);
1583 if (PQputline(con, "\\.\n"))
1585 fprintf(stderr, "very last PQputline failed\n");
1590 fprintf(stderr, "PQendcopy failed\n");
1593 executeStatement(con, "commit");
1598 fprintf(stderr, "vacuum...\n");
1599 executeStatement(con, "vacuum analyze pgbench_branches");
1600 executeStatement(con, "vacuum analyze pgbench_tellers");
1601 executeStatement(con, "vacuum analyze pgbench_accounts");
1602 executeStatement(con, "vacuum analyze pgbench_history");
1608 fprintf(stderr, "set primary keys...\n");
1609 for (i = 0; i < lengthof(DDLAFTERs); i++)
1613 strncpy(buffer, DDLAFTERs[i], 256);
1615 if (index_tablespace != NULL)
1617 char *escape_tablespace;
1619 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
1620 strlen(index_tablespace));
1621 snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
1622 " using index tablespace %s", escape_tablespace);
1623 PQfreemem(escape_tablespace);
1626 executeStatement(con, buffer);
1630 * create foreign keys
1634 fprintf(stderr, "set foreign keys...\n");
1635 for (i = 0; i < lengthof(DDLKEYs); i++)
1637 executeStatement(con, DDLKEYs[i]);
1642 fprintf(stderr, "done.\n");
1647 * Parse the raw sql and replace :param to $n.
1650 parseQuery(Command *cmd, const char *raw_sql)
1655 sql = pg_strdup(raw_sql);
1659 while ((p = strchr(p, ':')) != NULL)
1665 name = parseVariable(p, &eaten);
1675 if (cmd->argc >= MAX_ARGS)
1677 fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
1681 sprintf(var, "$%d", cmd->argc);
1682 p = replaceVariable(&sql, p, eaten, var);
1684 cmd->argv[cmd->argc] = name;
1692 /* Parse a command; return a Command struct, or NULL if it's a comment */
1694 process_commands(char *buf)
1696 const char delim[] = " \f\n\r\t\v";
1698 Command *my_commands;
1703 /* Make the string buf end at the next newline */
1704 if ((p = strchr(buf, '\n')) != NULL)
1707 /* Skip leading whitespace */
1709 while (isspace((unsigned char) *p))
1712 /* If the line is empty or actually a comment, we're done */
1713 if (*p == '\0' || strncmp(p, "--", 2) == 0)
1716 /* Allocate and initialize Command structure */
1717 my_commands = (Command *) pg_malloc(sizeof(Command));
1718 my_commands->line = pg_strdup(buf);
1719 my_commands->command_num = num_commands++;
1720 my_commands->type = 0; /* until set */
1721 my_commands->argc = 0;
1725 my_commands->type = META_COMMAND;
1728 tok = strtok(++p, delim);
1732 my_commands->argv[j++] = pg_strdup(tok);
1733 my_commands->argc++;
1734 tok = strtok(NULL, delim);
1737 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
1739 if (my_commands->argc < 4)
1741 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1745 for (j = 4; j < my_commands->argc; j++)
1746 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1747 my_commands->argv[0], my_commands->argv[j]);
1749 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
1751 if (my_commands->argc < 3)
1753 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1757 for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
1758 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1759 my_commands->argv[0], my_commands->argv[j]);
1761 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
1763 if (my_commands->argc < 2)
1765 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1770 * Split argument into number and unit to allow "sleep 1ms" etc.
1771 * We don't have to terminate the number argument with null
1772 * because it will be parsed with atoi, which ignores trailing
1773 * non-digit characters.
1775 if (my_commands->argv[1][0] != ':')
1777 char *c = my_commands->argv[1];
1779 while (isdigit((unsigned char) *c))
1783 my_commands->argv[2] = c;
1784 if (my_commands->argc < 3)
1785 my_commands->argc = 3;
1789 if (my_commands->argc >= 3)
1791 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
1792 pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
1793 pg_strcasecmp(my_commands->argv[2], "s") != 0)
1795 fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
1796 my_commands->argv[0], my_commands->argv[2]);
1801 for (j = 3; j < my_commands->argc; j++)
1802 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1803 my_commands->argv[0], my_commands->argv[j]);
1805 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
1807 if (my_commands->argc < 3)
1809 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1813 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
1815 if (my_commands->argc < 1)
1817 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
1823 fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
1829 my_commands->type = SQL_COMMAND;
1834 my_commands->argv[0] = pg_strdup(p);
1835 my_commands->argc++;
1837 case QUERY_EXTENDED:
1838 case QUERY_PREPARED:
1839 if (!parseQuery(my_commands, p))
1851 process_file(char *filename)
1853 #define COMMANDS_ALLOC_NUM 128
1855 Command **my_commands;
1861 if (num_files >= MAX_FILES)
1863 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
1867 alloc_num = COMMANDS_ALLOC_NUM;
1868 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1870 if (strcmp(filename, "-") == 0)
1872 else if ((fd = fopen(filename, "r")) == NULL)
1874 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
1880 while (fgets(buf, sizeof(buf), fd) != NULL)
1884 command = process_commands(buf);
1885 if (command == NULL)
1888 my_commands[lineno] = command;
1891 if (lineno >= alloc_num)
1893 alloc_num += COMMANDS_ALLOC_NUM;
1894 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
1899 my_commands[lineno] = NULL;
1901 sql_files[num_files++] = my_commands;
1907 process_builtin(char *tb)
1909 #define COMMANDS_ALLOC_NUM 128
1911 Command **my_commands;
1916 alloc_num = COMMANDS_ALLOC_NUM;
1917 my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1927 while (*tb && *tb != '\n')
1938 command = process_commands(buf);
1939 if (command == NULL)
1942 my_commands[lineno] = command;
1945 if (lineno >= alloc_num)
1947 alloc_num += COMMANDS_ALLOC_NUM;
1948 my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
1952 my_commands[lineno] = NULL;
1957 /* print out results */
1959 printResults(int ttype, int normal_xacts, int nclients,
1960 TState *threads, int nthreads,
1961 instr_time total_time, instr_time conn_total_time)
1963 double time_include,
1968 time_include = INSTR_TIME_GET_DOUBLE(total_time);
1969 tps_include = normal_xacts / time_include;
1970 tps_exclude = normal_xacts / (time_include -
1971 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
1974 s = "TPC-B (sort of)";
1975 else if (ttype == 2)
1976 s = "Update only pgbench_accounts";
1977 else if (ttype == 1)
1982 printf("transaction type: %s\n", s);
1983 printf("scaling factor: %d\n", scale);
1984 printf("query mode: %s\n", QUERYMODE[querymode]);
1985 printf("number of clients: %d\n", nclients);
1986 printf("number of threads: %d\n", nthreads);
1989 printf("number of transactions per client: %d\n", nxacts);
1990 printf("number of transactions actually processed: %d/%d\n",
1991 normal_xacts, nxacts * nclients);
1995 printf("duration: %d s\n", duration);
1996 printf("number of transactions actually processed: %d\n",
1999 printf("tps = %f (including connections establishing)\n", tps_include);
2000 printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2002 /* Report per-command latencies */
2007 for (i = 0; i < num_files; i++)
2012 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2014 printf("statement latencies in milliseconds:\n");
2016 for (commands = sql_files[i]; *commands != NULL; commands++)
2018 Command *command = *commands;
2019 int cnum = command->command_num;
2021 instr_time total_exec_elapsed;
2022 int total_exec_count;
2025 /* Accumulate per-thread data for command */
2026 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2027 total_exec_count = 0;
2028 for (t = 0; t < nthreads; t++)
2030 TState *thread = &threads[t];
2032 INSTR_TIME_ADD(total_exec_elapsed,
2033 thread->exec_elapsed[cnum]);
2034 total_exec_count += thread->exec_count[cnum];
2037 if (total_exec_count > 0)
2038 total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2042 printf("\t%f\t%s\n", total_time, command->line);
2050 main(int argc, char **argv)
2052 static struct option long_options[] = {
2053 {"foreign-keys", no_argument, &foreign_keys, 1},
2054 {"index-tablespace", required_argument, NULL, 3},
2055 {"tablespace", required_argument, NULL, 2},
2056 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2057 {"sampling-rate", required_argument, NULL, 4},
2062 int nclients = 1; /* default number of simulated clients */
2063 int nthreads = 1; /* default number of threads */
2064 int is_init_mode = 0; /* initialize mode? */
2065 int is_no_vacuum = 0; /* no vacuum at all before testing? */
2066 int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2067 int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only,
2068 * 2: skip update of branches and tellers */
2070 char *filename = NULL;
2071 bool scale_given = false;
2073 CState *state; /* status of clients */
2074 TState *threads; /* array of thread */
2076 instr_time start_time; /* start up time */
2077 instr_time total_time;
2078 instr_time conn_total_time;
2083 #ifdef HAVE_GETRLIMIT
2093 progname = get_progname(argv[0]);
2097 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2102 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2104 puts("pgbench (PostgreSQL) " PG_VERSION);
2110 /* stderr is buffered on Win32. */
2111 setvbuf(stderr, NULL, _IONBF, 0);
2114 if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2116 if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2118 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2121 state = (CState *) pg_malloc(sizeof(CState));
2122 memset(state, 0, sizeof(CState));
2124 while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
2132 pghost = pg_strdup(optarg);
2138 do_vacuum_accounts++;
2141 pgport = pg_strdup(optarg);
2153 nclients = atoi(optarg);
2154 if (nclients <= 0 || nclients > MAXCLIENTS)
2156 fprintf(stderr, "invalid number of clients: %d\n", nclients);
2159 #ifdef HAVE_GETRLIMIT
2160 #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */
2161 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2162 #else /* but BSD doesn't ... */
2163 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2164 #endif /* RLIMIT_NOFILE */
2166 fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2169 if (rlim.rlim_cur <= (nclients + 2))
2171 fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2172 fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2175 #endif /* HAVE_GETRLIMIT */
2177 case 'j': /* jobs */
2178 nthreads = atoi(optarg);
2181 fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2189 is_latencies = true;
2193 scale = atoi(optarg);
2196 fprintf(stderr, "invalid scaling factor: %d\n", scale);
2203 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2206 nxacts = atoi(optarg);
2209 fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2216 fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2219 duration = atoi(optarg);
2222 fprintf(stderr, "invalid duration: %d\n", duration);
2227 login = pg_strdup(optarg);
2237 filename = pg_strdup(optarg);
2238 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2245 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2247 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2252 if (!putVariable(&state[0], "option", optarg, p))
2257 fillfactor = atoi(optarg);
2258 if ((fillfactor < 10) || (fillfactor > 100))
2260 fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2267 fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
2270 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2271 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2273 if (querymode >= NUM_QUERYMODE)
2275 fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
2280 /* This covers long options which take no argument. */
2282 case 2: /* tablespace */
2283 tablespace = pg_strdup(optarg);
2285 case 3: /* index-tablespace */
2286 index_tablespace = pg_strdup(optarg);
2289 sample_rate = atof(optarg);
2290 if (sample_rate <= 0.0 || sample_rate > 1.0)
2292 fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
2297 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
2304 dbName = argv[optind];
2307 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
2309 else if (login != NULL && *login != '\0')
2321 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
2322 if (nxacts <= 0 && duration <= 0)
2323 nxacts = DEFAULT_NXACTS;
2325 if (nclients % nthreads != 0)
2327 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
2331 /* --sampling-rate may be used only with -l */
2332 if (sample_rate > 0.0 && !use_log)
2334 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
2338 /* -q may be used only with -i */
2339 if (use_quiet && !is_init_mode)
2341 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
2346 * is_latencies only works with multiple threads in thread-based
2347 * implementations, not fork-based ones, because it supposes that the
2348 * parent can see changes made to the per-thread execution stats by child
2349 * threads. It seems useful enough to accept despite this limitation, but
2350 * perhaps we should FIXME someday (by passing the stats data back up
2351 * through the parent-to-child pipes).
2353 #ifndef ENABLE_THREAD_SAFETY
2354 if (is_latencies && nthreads > 1)
2356 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
2362 * save main process id in the global variable because process id will be
2363 * changed after fork.
2365 main_pid = (int) getpid();
2369 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
2370 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
2372 /* copy any -D switch values to all clients */
2373 for (i = 1; i < nclients; i++)
2378 for (j = 0; j < state[0].nvariables; j++)
2380 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
2389 printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
2390 pghost, pgport, nclients, nxacts, dbName);
2392 printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
2393 pghost, pgport, nclients, duration, dbName);
2396 /* opening connection... */
2401 if (PQstatus(con) == CONNECTION_BAD)
2403 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
2404 fprintf(stderr, "%s", PQerrorMessage(con));
2411 * get the scaling factor that should be same as count(*) from
2412 * pgbench_branches if this is not a custom query
2414 res = PQexec(con, "select count(*) from pgbench_branches");
2415 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2417 fprintf(stderr, "%s", PQerrorMessage(con));
2420 scale = atoi(PQgetvalue(res, 0, 0));
2423 fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
2428 /* warn if we override user-given -s switch */
2431 "Scale option ignored, using pgbench_branches table count = %d\n",
2436 * :scale variables normally get -s or database scale, but don't override
2437 * an explicit -D switch
2439 if (getVariable(&state[0], "scale") == NULL)
2441 snprintf(val, sizeof(val), "%d", scale);
2442 for (i = 0; i < nclients; i++)
2444 if (!putVariable(&state[i], "startup", "scale", val))
2451 fprintf(stderr, "starting vacuum...");
2452 executeStatement(con, "vacuum pgbench_branches");
2453 executeStatement(con, "vacuum pgbench_tellers");
2454 executeStatement(con, "truncate pgbench_history");
2455 fprintf(stderr, "end.\n");
2457 if (do_vacuum_accounts)
2459 fprintf(stderr, "starting vacuum pgbench_accounts...");
2460 executeStatement(con, "vacuum analyze pgbench_accounts");
2461 fprintf(stderr, "end.\n");
2466 /* set random seed */
2467 INSTR_TIME_SET_CURRENT(start_time);
2468 srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
2470 /* process builtin SQL scripts */
2474 sql_files[0] = process_builtin(tpc_b);
2479 sql_files[0] = process_builtin(select_only);
2484 sql_files[0] = process_builtin(simple_update);
2492 /* set up thread data structures */
2493 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
2494 for (i = 0; i < nthreads; i++)
2496 TState *thread = &threads[i];
2499 thread->state = &state[nclients / nthreads * i];
2500 thread->nstate = nclients / nthreads;
2501 thread->random_state[0] = random();
2502 thread->random_state[1] = random();
2503 thread->random_state[2] = random();
2507 /* Reserve memory for the thread to store per-command latencies */
2510 thread->exec_elapsed = (instr_time *)
2511 pg_malloc(sizeof(instr_time) * num_commands);
2512 thread->exec_count = (int *)
2513 pg_malloc(sizeof(int) * num_commands);
2515 for (t = 0; t < num_commands; t++)
2517 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
2518 thread->exec_count[t] = 0;
2523 thread->exec_elapsed = NULL;
2524 thread->exec_count = NULL;
2528 /* get start up time */
2529 INSTR_TIME_SET_CURRENT(start_time);
2531 /* set alarm if duration is specified. */
2536 for (i = 0; i < nthreads; i++)
2538 TState *thread = &threads[i];
2540 INSTR_TIME_SET_CURRENT(thread->start_time);
2542 /* the first thread (i = 0) is executed by main thread */
2545 int err = pthread_create(&thread->thread, NULL, threadRun, thread);
2547 if (err != 0 || thread->thread == INVALID_THREAD)
2549 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
2555 thread->thread = INVALID_THREAD;
2559 /* wait for threads and accumulate results */
2561 INSTR_TIME_SET_ZERO(conn_total_time);
2562 for (i = 0; i < nthreads; i++)
2566 if (threads[i].thread == INVALID_THREAD)
2567 ret = threadRun(&threads[i]);
2569 pthread_join(threads[i].thread, &ret);
2573 TResult *r = (TResult *) ret;
2575 total_xacts += r->xacts;
2576 INSTR_TIME_ADD(conn_total_time, r->conn_time);
2580 disconnect_all(state, nclients);
2583 INSTR_TIME_SET_CURRENT(total_time);
2584 INSTR_TIME_SUBTRACT(total_time, start_time);
2585 printResults(ttype, total_xacts, nclients, threads, nthreads,
2586 total_time, conn_total_time);
2592 threadRun(void *arg)
2594 TState *thread = (TState *) arg;
2595 CState *state = thread->state;
2597 FILE *logfile = NULL; /* per-thread log file */
2600 int nstate = thread->nstate;
2601 int remains = nstate; /* number of remaining clients */
2604 result = pg_malloc(sizeof(TResult));
2605 INSTR_TIME_SET_ZERO(result->conn_time);
2607 /* open log file if requested */
2612 if (thread->tid == 0)
2613 snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
2615 snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
2616 logfile = fopen(logpath, "w");
2618 if (logfile == NULL)
2620 fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
2627 /* make connections to the database */
2628 for (i = 0; i < nstate; i++)
2630 if ((state[i].con = doConnect()) == NULL)
2635 /* time after thread and connections set up */
2636 INSTR_TIME_SET_CURRENT(result->conn_time);
2637 INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
2639 /* send start up queries in async manner */
2640 for (i = 0; i < nstate; i++)
2642 CState *st = &state[i];
2643 Command **commands = sql_files[st->use_file];
2644 int prev_ecnt = st->ecnt;
2646 st->use_file = getrand(thread, 0, num_files - 1);
2647 if (!doCustom(thread, st, &result->conn_time, logfile))
2648 remains--; /* I've aborted */
2650 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2652 fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
2653 remains--; /* I've aborted */
2662 int maxsock; /* max socket number to be waited */
2666 FD_ZERO(&input_mask);
2669 min_usec = INT64_MAX;
2670 for (i = 0; i < nstate; i++)
2672 CState *st = &state[i];
2673 Command **commands = sql_files[st->use_file];
2680 if (min_usec == INT64_MAX)
2684 INSTR_TIME_SET_CURRENT(now);
2685 now_usec = INSTR_TIME_GET_MICROSEC(now);
2688 this_usec = st->until - now_usec;
2689 if (min_usec > this_usec)
2690 min_usec = this_usec;
2692 else if (st->con == NULL)
2696 else if (commands[st->state]->type == META_COMMAND)
2698 min_usec = 0; /* the connection is ready to run */
2702 sock = PQsocket(st->con);
2705 fprintf(stderr, "bad socket: %s\n", strerror(errno));
2709 FD_SET(sock, &input_mask);
2715 if (min_usec > 0 && maxsock != -1)
2717 int nsocks; /* return from select(2) */
2719 if (min_usec != INT64_MAX)
2721 struct timeval timeout;
2723 timeout.tv_sec = min_usec / 1000000;
2724 timeout.tv_usec = min_usec % 1000000;
2725 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
2728 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
2733 /* must be something wrong */
2734 fprintf(stderr, "select failed: %s\n", strerror(errno));
2739 /* ok, backend returns reply */
2740 for (i = 0; i < nstate; i++)
2742 CState *st = &state[i];
2743 Command **commands = sql_files[st->use_file];
2744 int prev_ecnt = st->ecnt;
2746 if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
2747 || commands[st->state]->type == META_COMMAND))
2749 if (!doCustom(thread, st, &result->conn_time, logfile))
2750 remains--; /* I've aborted */
2753 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2755 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
2756 remains--; /* I've aborted */
2764 INSTR_TIME_SET_CURRENT(start);
2765 disconnect_all(state, nstate);
2767 for (i = 0; i < nstate; i++)
2768 result->xacts += state[i].cnt;
2769 INSTR_TIME_SET_CURRENT(end);
2770 INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
2778 * Support for duration option: set timer_exceeded after so many seconds.
2784 handle_sig_alarm(SIGNAL_ARGS)
2786 timer_exceeded = true;
2790 setalarm(int seconds)
2792 pqsignal(SIGALRM, handle_sig_alarm);
2796 #ifndef ENABLE_THREAD_SAFETY
2799 * implements pthread using fork.
2802 typedef struct fork_pthread
2809 pthread_create(pthread_t *thread,
2810 pthread_attr_t *attr,
2811 void *(*start_routine) (void *),
2817 th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
2818 if (pipe(th->pipes) < 0)
2825 if (th->pid == -1) /* error */
2830 if (th->pid != 0) /* in parent process */
2832 close(th->pipes[1]);
2837 /* in child process */
2838 close(th->pipes[0]);
2840 /* set alarm again because the child does not inherit timers */
2844 ret = start_routine(arg);
2845 write(th->pipes[1], ret, sizeof(TResult));
2846 close(th->pipes[1]);
2852 pthread_join(pthread_t th, void **thread_return)
2856 while (waitpid(th->pid, &status, 0) != th->pid)
2862 if (thread_return != NULL)
2864 /* assume result is TResult */
2865 *thread_return = pg_malloc(sizeof(TResult));
2866 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
2868 free(*thread_return);
2869 *thread_return = NULL;
2872 close(th->pipes[0]);
2880 static VOID CALLBACK
2881 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
2883 timer_exceeded = true;
2887 setalarm(int seconds)
2892 /* This function will be called at most once, so we can cheat a bit. */
2893 queue = CreateTimerQueue();
2894 if (seconds > ((DWORD) -1) / 1000 ||
2895 !CreateTimerQueueTimer(&timer, queue,
2896 win32_timer_callback, NULL, seconds * 1000, 0,
2897 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
2899 fprintf(stderr, "Failed to set timer\n");
2904 /* partial pthread implementation for Windows */
2906 typedef struct win32_pthread
2909 void *(*routine) (void *);
2914 static unsigned __stdcall
2915 win32_pthread_run(void *arg)
2917 win32_pthread *th = (win32_pthread *) arg;
2919 th->result = th->routine(th->arg);
2925 pthread_create(pthread_t *thread,
2926 pthread_attr_t *attr,
2927 void *(*start_routine) (void *),
2933 th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
2934 th->routine = start_routine;
2938 th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
2939 if (th->handle == NULL)
2951 pthread_join(pthread_t th, void **thread_return)
2953 if (th == NULL || th->handle == NULL)
2954 return errno = EINVAL;
2956 if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
2958 _dosmaperr(GetLastError());
2963 *thread_return = th->result;
2965 CloseHandle(th->handle);