]> granicus.if.org Git - postgresql/blob - contrib/pgbench/pgbench.c
Allow pgbench to use a scale larger than 21474.
[postgresql] / contrib / pgbench / pgbench.c
1 /*
2  * pgbench.c
3  *
4  * A simple benchmark program for PostgreSQL
5  * Originally written by Tatsuo Ishii and enhanced by many contributors.
6  *
7  * contrib/pgbench/pgbench.c
8  * Copyright (c) 2000-2013, PostgreSQL Global Development Group
9  * ALL RIGHTS RESERVED;
10  *
11  * Permission to use, copy, modify, and distribute this software and its
12  * documentation for any purpose, without fee, and without a written agreement
13  * is hereby granted, provided that the above copyright notice and this
14  * paragraph and the following two paragraphs appear in all copies.
15  *
16  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20  * POSSIBILITY OF SUCH DAMAGE.
21  *
22  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
25  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
27  *
28  */
29
30 #ifdef WIN32
31 #define FD_SETSIZE 1024                 /* set before winsock2.h is included */
32 #endif   /* ! WIN32 */
33
34 #include "postgres_fe.h"
35
36 #include "getopt_long.h"
37 #include "libpq-fe.h"
38 #include "libpq/pqsignal.h"
39 #include "portability/instr_time.h"
40
41 #include <ctype.h>
42 #include <math.h>
43
44 #ifndef WIN32
45 #include <sys/time.h>
46 #include <unistd.h>
47 #endif   /* ! WIN32 */
48
49 #ifdef HAVE_SYS_SELECT_H
50 #include <sys/select.h>
51 #endif
52
53 #ifdef HAVE_SYS_RESOURCE_H
54 #include <sys/resource.h>               /* for getrlimit */
55 #endif
56
57 #ifndef INT64_MAX
58 #define INT64_MAX       INT64CONST(0x7FFFFFFFFFFFFFFF)
59 #endif
60
61 /*
62  * Multi-platform pthread implementations
63  */
64
65 #ifdef WIN32
66 /* Use native win32 threads on Windows */
67 typedef struct win32_pthread *pthread_t;
68 typedef int pthread_attr_t;
69
70 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
71 static int      pthread_join(pthread_t th, void **thread_return);
72 #elif defined(ENABLE_THREAD_SAFETY)
73 /* Use platform-dependent pthread capability */
74 #include <pthread.h>
75 #else
76 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
77
78 #include <sys/wait.h>
79
80 #define pthread_t                               pg_pthread_t
81 #define pthread_attr_t                  pg_pthread_attr_t
82 #define pthread_create                  pg_pthread_create
83 #define pthread_join                    pg_pthread_join
84
85 typedef struct fork_pthread *pthread_t;
86 typedef int pthread_attr_t;
87
88 static int      pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
89 static int      pthread_join(pthread_t th, void **thread_return);
90 #endif
91
92 extern char *optarg;
93 extern int      optind;
94
95
96 /********************************************************************
97  * some configurable parameters */
98
99 /* max number of clients allowed */
100 #ifdef FD_SETSIZE
101 #define MAXCLIENTS      (FD_SETSIZE - 10)
102 #else
103 #define MAXCLIENTS      1024
104 #endif
105
106 #define LOG_STEP_SECONDS        5       /* seconds between log messages */
107 #define DEFAULT_NXACTS  10              /* default nxacts */
108
109 int                     nxacts = 0;                     /* number of transactions per client */
110 int                     duration = 0;           /* duration in seconds */
111
112 /*
113  * scaling factor. for example, scale = 10 will make 1000000 tuples in
114  * pgbench_accounts table.
115  */
116 int                     scale = 1;
117
118 /*
119  * fillfactor. for example, fillfactor = 90 will use only 90 percent
120  * space during inserts and leave 10 percent free.
121  */
122 int                     fillfactor = 100;
123
124 /*
125  * create foreign key constraints on the tables?
126  */
127 int                     foreign_keys = 0;
128
129 /*
130  * use unlogged tables?
131  */
132 int                     unlogged_tables = 0;
133
134 /*
135  * log sampling rate (1.0 = log everything, 0.0 = option not given)
136  */
137 double          sample_rate = 0.0;
138
139 /*
140  * tablespace selection
141  */
142 char       *tablespace = NULL;
143 char       *index_tablespace = NULL;
144
145 /*
146  * end of configurable parameters
147  *********************************************************************/
148
149 #define nbranches       1                       /* Makes little sense to change this.  Change
150                                                                  * -s instead */
151 #define ntellers        10
152 #define naccounts       100000
153
154 /*
155  * The scale factor at/beyond which 32bit integers are incapable of storing
156  * 64bit values.
157  *
158  * Although the actual threshold is 21474, we use 20000 because it is easier to
159  * document and remember, and isn't that far away from the real threshold.
160  */
161 #define SCALE_32BIT_THRESHOLD 20000
162
163 bool            use_log;                        /* log transaction latencies to a file */
164 bool            use_quiet;                      /* quiet logging onto stderr */
165 bool            is_connect;                     /* establish connection for each transaction */
166 bool            is_latencies;           /* report per-command latencies */
167 int                     main_pid;                       /* main process id used in log filename */
168
169 char       *pghost = "";
170 char       *pgport = "";
171 char       *login = NULL;
172 char       *dbName;
173 const char *progname;
174
175 volatile bool timer_exceeded = false;   /* flag from signal handler */
176
177 /* variable definitions */
178 typedef struct
179 {
180         char       *name;                       /* variable name */
181         char       *value;                      /* its value */
182 } Variable;
183
184 #define MAX_FILES               128             /* max number of SQL script files allowed */
185 #define SHELL_COMMAND_SIZE      256 /* maximum size allowed for shell command */
186
187 /*
188  * structures used in custom query mode
189  */
190
191 typedef struct
192 {
193         PGconn     *con;                        /* connection handle to DB */
194         int                     id;                             /* client No. */
195         int                     state;                  /* state No. */
196         int                     cnt;                    /* xacts count */
197         int                     ecnt;                   /* error count */
198         int                     listen;                 /* 0 indicates that an async query has been
199                                                                  * sent */
200         int                     sleeping;               /* 1 indicates that the client is napping */
201         int64           until;                  /* napping until (usec) */
202         Variable   *variables;          /* array of variable definitions */
203         int                     nvariables;
204         instr_time      txn_begin;              /* used for measuring transaction latencies */
205         instr_time      stmt_begin;             /* used for measuring statement latencies */
206         int                     use_file;               /* index in sql_files for this client */
207         bool            prepared[MAX_FILES];
208 } CState;
209
210 /*
211  * Thread state and result
212  */
213 typedef struct
214 {
215         int                     tid;                    /* thread id */
216         pthread_t       thread;                 /* thread handle */
217         CState     *state;                      /* array of CState */
218         int                     nstate;                 /* length of state[] */
219         instr_time      start_time;             /* thread start time */
220         instr_time *exec_elapsed;       /* time spent executing cmds (per Command) */
221         int                *exec_count;         /* number of cmd executions (per Command) */
222         unsigned short random_state[3];         /* separate randomness for each thread */
223 } TState;
224
225 #define INVALID_THREAD          ((pthread_t) 0)
226
227 typedef struct
228 {
229         instr_time      conn_time;
230         int                     xacts;
231 } TResult;
232
233 /*
234  * queries read from files
235  */
236 #define SQL_COMMAND             1
237 #define META_COMMAND    2
238 #define MAX_ARGS                10
239
240 typedef enum QueryMode
241 {
242         QUERY_SIMPLE,                           /* simple query */
243         QUERY_EXTENDED,                         /* extended query */
244         QUERY_PREPARED,                         /* extended query with prepared statements */
245         NUM_QUERYMODE
246 } QueryMode;
247
248 static QueryMode querymode = QUERY_SIMPLE;
249 static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
250
251 typedef struct
252 {
253         char       *line;                       /* full text of command line */
254         int                     command_num;    /* unique index of this Command struct */
255         int                     type;                   /* command type (SQL_COMMAND or META_COMMAND) */
256         int                     argc;                   /* number of command words */
257         char       *argv[MAX_ARGS]; /* command word list */
258 } Command;
259
260 static Command **sql_files[MAX_FILES];  /* SQL script files */
261 static int      num_files;                      /* number of script files */
262 static int      num_commands = 0;       /* total number of Command structs */
263 static int      debug = 0;                      /* debug flag */
264
265 /* default scenario */
266 static char *tpc_b = {
267         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
268         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
269         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
270         "\\setrandom aid 1 :naccounts\n"
271         "\\setrandom bid 1 :nbranches\n"
272         "\\setrandom tid 1 :ntellers\n"
273         "\\setrandom delta -5000 5000\n"
274         "BEGIN;\n"
275         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
276         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
277         "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
278         "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
279         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
280         "END;\n"
281 };
282
283 /* -N case */
284 static char *simple_update = {
285         "\\set nbranches " CppAsString2(nbranches) " * :scale\n"
286         "\\set ntellers " CppAsString2(ntellers) " * :scale\n"
287         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
288         "\\setrandom aid 1 :naccounts\n"
289         "\\setrandom bid 1 :nbranches\n"
290         "\\setrandom tid 1 :ntellers\n"
291         "\\setrandom delta -5000 5000\n"
292         "BEGIN;\n"
293         "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
294         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
295         "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
296         "END;\n"
297 };
298
299 /* -S case */
300 static char *select_only = {
301         "\\set naccounts " CppAsString2(naccounts) " * :scale\n"
302         "\\setrandom aid 1 :naccounts\n"
303         "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
304 };
305
306 /* Function prototypes */
307 static void setalarm(int seconds);
308 static void *threadRun(void *arg);
309
310
311 /*
312  * routines to check mem allocations and fail noisily.
313  */
314 static void *
315 pg_malloc(size_t size)
316 {
317         void       *result;
318
319         /* Avoid unportable behavior of malloc(0) */
320         if (size == 0)
321                 size = 1;
322         result = malloc(size);
323         if (!result)
324         {
325                 fprintf(stderr, "out of memory\n");
326                 exit(1);
327         }
328         return result;
329 }
330
331 static void *
332 pg_realloc(void *ptr, size_t size)
333 {
334         void       *result;
335
336         /* Avoid unportable behavior of realloc(NULL, 0) */
337         if (ptr == NULL && size == 0)
338                 size = 1;
339         result = realloc(ptr, size);
340         if (!result)
341         {
342                 fprintf(stderr, "out of memory\n");
343                 exit(1);
344         }
345         return result;
346 }
347
348 static char *
349 pg_strdup(const char *s)
350 {
351         char       *result;
352
353         result = strdup(s);
354         if (!result)
355         {
356                 fprintf(stderr, "out of memory\n");
357                 exit(1);
358         }
359         return result;
360 }
361
362
363 static void
364 usage(void)
365 {
366         printf("%s is a benchmarking tool for PostgreSQL.\n\n"
367                    "Usage:\n"
368                    "  %s [OPTION]... [DBNAME]\n"
369                    "\nInitialization options:\n"
370                    "  -i           invokes initialization mode\n"
371                    "  -n           do not run VACUUM after initialization\n"
372                    "  -F NUM       fill factor\n"
373                    "  -s NUM       scaling factor\n"
374                    "  -q           quiet logging (one message each 5 seconds)\n"
375                    "  --foreign-keys\n"
376                    "               create foreign key constraints between tables\n"
377                    "  --index-tablespace=TABLESPACE\n"
378                    "               create indexes in the specified tablespace\n"
379                    "  --tablespace=TABLESPACE\n"
380                    "               create tables in the specified tablespace\n"
381                    "  --unlogged-tables\n"
382                    "               create tables as unlogged tables\n"
383                    "\nBenchmarking options:\n"
384                 "  -c NUM       number of concurrent database clients (default: 1)\n"
385                    "  -C           establish new connection for each transaction\n"
386                    "  -D VARNAME=VALUE\n"
387                    "               define variable for use by custom script\n"
388                    "  -f FILENAME  read transaction script from FILENAME\n"
389                    "  -j NUM       number of threads (default: 1)\n"
390                    "  -l           write transaction times to log file\n"
391                    "  --sampling-rate NUM\n"
392                    "               fraction of transactions to log (e.g. 0.01 for 1%% sample)\n"
393                    "  -M simple|extended|prepared\n"
394                    "               protocol for submitting queries to server (default: simple)\n"
395                    "  -n           do not run VACUUM before tests\n"
396                    "  -N           do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
397                    "  -r           report average latency per command\n"
398                    "  -s NUM       report this scale factor in output\n"
399                    "  -S           perform SELECT-only transactions\n"
400          "  -t NUM       number of transactions each client runs (default: 10)\n"
401                    "  -T NUM       duration of benchmark test in seconds\n"
402                    "  -v           vacuum all four standard tables before tests\n"
403                    "\nCommon options:\n"
404                    "  -d             print debugging output\n"
405                    "  -h HOSTNAME    database server host or socket directory\n"
406                    "  -p PORT        database server port number\n"
407                    "  -U USERNAME    connect as specified database user\n"
408                    "  -V, --version  output version information, then exit\n"
409                    "  -?, --help     show this help, then exit\n"
410                    "\n"
411                    "Report bugs to <pgsql-bugs@postgresql.org>.\n",
412                    progname, progname);
413 }
414
415 /*
416  * strtoint64 -- convert a string to 64-bit integer
417  *
418  * This function is a modified version of scanint8() from
419  * src/backend/utils/adt/int8.c.
420  */
421 static int64
422 strtoint64(const char *str)
423 {
424         const char *ptr = str;
425         int64           result = 0;
426         int                     sign = 1;
427
428         /*
429          * Do our own scan, rather than relying on sscanf which might be broken
430          * for long long.
431          */
432
433         /* skip leading spaces */
434         while (*ptr && isspace((unsigned char) *ptr))
435                 ptr++;
436
437         /* handle sign */
438         if (*ptr == '-')
439         {
440                 ptr++;
441
442                 /*
443                  * Do an explicit check for INT64_MIN.  Ugly though this is, it's
444                  * cleaner than trying to get the loop below to handle it portably.
445                  */
446                 if (strncmp(ptr, "9223372036854775808", 19) == 0)
447                 {
448                         result = -INT64CONST(0x7fffffffffffffff) - 1;
449                         ptr += 19;
450                         goto gotdigits;
451                 }
452                 sign = -1;
453         }
454         else if (*ptr == '+')
455                 ptr++;
456
457         /* require at least one digit */
458         if (!isdigit((unsigned char) *ptr))
459                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
460
461         /* process digits */
462         while (*ptr && isdigit((unsigned char) *ptr))
463         {
464                 int64           tmp = result * 10 + (*ptr++ - '0');
465
466                 if ((tmp / 10) != result)               /* overflow? */
467                         fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
468                 result = tmp;
469         }
470
471 gotdigits:
472
473         /* allow trailing whitespace, but not other trailing chars */
474         while (*ptr != '\0' && isspace((unsigned char) *ptr))
475                 ptr++;
476
477         if (*ptr != '\0')
478                 fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
479
480         return ((sign < 0) ? -result : result);
481 }
482
483 /* random number generator: uniform distribution from min to max inclusive */
484 static int64
485 getrand(TState *thread, int64 min, int64 max)
486 {
487         /*
488          * Odd coding is so that min and max have approximately the same chance of
489          * being selected as do numbers between them.
490          *
491          * pg_erand48() is thread-safe and concurrent, which is why we use it
492          * rather than random(), which in glibc is non-reentrant, and therefore
493          * protected by a mutex, and therefore a bottleneck on machines with many
494          * CPUs.
495          */
496         return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
497 }
498
499 /* call PQexec() and exit() on failure */
500 static void
501 executeStatement(PGconn *con, const char *sql)
502 {
503         PGresult   *res;
504
505         res = PQexec(con, sql);
506         if (PQresultStatus(res) != PGRES_COMMAND_OK)
507         {
508                 fprintf(stderr, "%s", PQerrorMessage(con));
509                 exit(1);
510         }
511         PQclear(res);
512 }
513
514 /* set up a connection to the backend */
515 static PGconn *
516 doConnect(void)
517 {
518         PGconn     *conn;
519         static char *password = NULL;
520         bool            new_pass;
521
522         /*
523          * Start the connection.  Loop until we have a password if requested by
524          * backend.
525          */
526         do
527         {
528 #define PARAMS_ARRAY_SIZE       7
529
530                 const char *keywords[PARAMS_ARRAY_SIZE];
531                 const char *values[PARAMS_ARRAY_SIZE];
532
533                 keywords[0] = "host";
534                 values[0] = pghost;
535                 keywords[1] = "port";
536                 values[1] = pgport;
537                 keywords[2] = "user";
538                 values[2] = login;
539                 keywords[3] = "password";
540                 values[3] = password;
541                 keywords[4] = "dbname";
542                 values[4] = dbName;
543                 keywords[5] = "fallback_application_name";
544                 values[5] = progname;
545                 keywords[6] = NULL;
546                 values[6] = NULL;
547
548                 new_pass = false;
549
550                 conn = PQconnectdbParams(keywords, values, true);
551
552                 if (!conn)
553                 {
554                         fprintf(stderr, "Connection to database \"%s\" failed\n",
555                                         dbName);
556                         return NULL;
557                 }
558
559                 if (PQstatus(conn) == CONNECTION_BAD &&
560                         PQconnectionNeedsPassword(conn) &&
561                         password == NULL)
562                 {
563                         PQfinish(conn);
564                         password = simple_prompt("Password: ", 100, false);
565                         new_pass = true;
566                 }
567         } while (new_pass);
568
569         /* check to see that the backend connection was successfully made */
570         if (PQstatus(conn) == CONNECTION_BAD)
571         {
572                 fprintf(stderr, "Connection to database \"%s\" failed:\n%s",
573                                 dbName, PQerrorMessage(conn));
574                 PQfinish(conn);
575                 return NULL;
576         }
577
578         return conn;
579 }
580
581 /* throw away response from backend */
582 static void
583 discard_response(CState *state)
584 {
585         PGresult   *res;
586
587         do
588         {
589                 res = PQgetResult(state->con);
590                 if (res)
591                         PQclear(res);
592         } while (res);
593 }
594
595 static int
596 compareVariables(const void *v1, const void *v2)
597 {
598         return strcmp(((const Variable *) v1)->name,
599                                   ((const Variable *) v2)->name);
600 }
601
602 static char *
603 getVariable(CState *st, char *name)
604 {
605         Variable        key,
606                            *var;
607
608         /* On some versions of Solaris, bsearch of zero items dumps core */
609         if (st->nvariables <= 0)
610                 return NULL;
611
612         key.name = name;
613         var = (Variable *) bsearch((void *) &key,
614                                                            (void *) st->variables,
615                                                            st->nvariables,
616                                                            sizeof(Variable),
617                                                            compareVariables);
618         if (var != NULL)
619                 return var->value;
620         else
621                 return NULL;
622 }
623
624 /* check whether the name consists of alphabets, numerals and underscores. */
625 static bool
626 isLegalVariableName(const char *name)
627 {
628         int                     i;
629
630         for (i = 0; name[i] != '\0'; i++)
631         {
632                 if (!isalnum((unsigned char) name[i]) && name[i] != '_')
633                         return false;
634         }
635
636         return true;
637 }
638
639 static int
640 putVariable(CState *st, const char *context, char *name, char *value)
641 {
642         Variable        key,
643                            *var;
644
645         key.name = name;
646         /* On some versions of Solaris, bsearch of zero items dumps core */
647         if (st->nvariables > 0)
648                 var = (Variable *) bsearch((void *) &key,
649                                                                    (void *) st->variables,
650                                                                    st->nvariables,
651                                                                    sizeof(Variable),
652                                                                    compareVariables);
653         else
654                 var = NULL;
655
656         if (var == NULL)
657         {
658                 Variable   *newvars;
659
660                 /*
661                  * Check for the name only when declaring a new variable to avoid
662                  * overhead.
663                  */
664                 if (!isLegalVariableName(name))
665                 {
666                         fprintf(stderr, "%s: invalid variable name '%s'\n", context, name);
667                         return false;
668                 }
669
670                 if (st->variables)
671                         newvars = (Variable *) pg_realloc(st->variables,
672                                                                         (st->nvariables + 1) * sizeof(Variable));
673                 else
674                         newvars = (Variable *) pg_malloc(sizeof(Variable));
675
676                 st->variables = newvars;
677
678                 var = &newvars[st->nvariables];
679
680                 var->name = pg_strdup(name);
681                 var->value = pg_strdup(value);
682
683                 st->nvariables++;
684
685                 qsort((void *) st->variables, st->nvariables, sizeof(Variable),
686                           compareVariables);
687         }
688         else
689         {
690                 char       *val;
691
692                 /* dup then free, in case value is pointing at this variable */
693                 val = pg_strdup(value);
694
695                 free(var->value);
696                 var->value = val;
697         }
698
699         return true;
700 }
701
702 static char *
703 parseVariable(const char *sql, int *eaten)
704 {
705         int                     i = 0;
706         char       *name;
707
708         do
709         {
710                 i++;
711         } while (isalnum((unsigned char) sql[i]) || sql[i] == '_');
712         if (i == 1)
713                 return NULL;
714
715         name = pg_malloc(i);
716         memcpy(name, &sql[1], i - 1);
717         name[i - 1] = '\0';
718
719         *eaten = i;
720         return name;
721 }
722
723 static char *
724 replaceVariable(char **sql, char *param, int len, char *value)
725 {
726         int                     valueln = strlen(value);
727
728         if (valueln > len)
729         {
730                 size_t          offset = param - *sql;
731
732                 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
733                 param = *sql + offset;
734         }
735
736         if (valueln != len)
737                 memmove(param + valueln, param + len, strlen(param + len) + 1);
738         strncpy(param, value, valueln);
739
740         return param + valueln;
741 }
742
743 static char *
744 assignVariables(CState *st, char *sql)
745 {
746         char       *p,
747                            *name,
748                            *val;
749
750         p = sql;
751         while ((p = strchr(p, ':')) != NULL)
752         {
753                 int                     eaten;
754
755                 name = parseVariable(p, &eaten);
756                 if (name == NULL)
757                 {
758                         while (*p == ':')
759                         {
760                                 p++;
761                         }
762                         continue;
763                 }
764
765                 val = getVariable(st, name);
766                 free(name);
767                 if (val == NULL)
768                 {
769                         p++;
770                         continue;
771                 }
772
773                 p = replaceVariable(&sql, p, eaten, val);
774         }
775
776         return sql;
777 }
778
779 static void
780 getQueryParams(CState *st, const Command *command, const char **params)
781 {
782         int                     i;
783
784         for (i = 0; i < command->argc - 1; i++)
785                 params[i] = getVariable(st, command->argv[i + 1]);
786 }
787
788 /*
789  * Run a shell command. The result is assigned to the variable if not NULL.
790  * Return true if succeeded, or false on error.
791  */
792 static bool
793 runShellCommand(CState *st, char *variable, char **argv, int argc)
794 {
795         char            command[SHELL_COMMAND_SIZE];
796         int                     i,
797                                 len = 0;
798         FILE       *fp;
799         char            res[64];
800         char       *endptr;
801         int                     retval;
802
803         /*----------
804          * Join arguments with whitespace separators. Arguments starting with
805          * exactly one colon are treated as variables:
806          *      name - append a string "name"
807          *      :var - append a variable named 'var'
808          *      ::name - append a string ":name"
809          *----------
810          */
811         for (i = 0; i < argc; i++)
812         {
813                 char       *arg;
814                 int                     arglen;
815
816                 if (argv[i][0] != ':')
817                 {
818                         arg = argv[i];          /* a string literal */
819                 }
820                 else if (argv[i][1] == ':')
821                 {
822                         arg = argv[i] + 1;      /* a string literal starting with colons */
823                 }
824                 else if ((arg = getVariable(st, argv[i] + 1)) == NULL)
825                 {
826                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]);
827                         return false;
828                 }
829
830                 arglen = strlen(arg);
831                 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
832                 {
833                         fprintf(stderr, "%s: too long shell command\n", argv[0]);
834                         return false;
835                 }
836
837                 if (i > 0)
838                         command[len++] = ' ';
839                 memcpy(command + len, arg, arglen);
840                 len += arglen;
841         }
842
843         command[len] = '\0';
844
845         /* Fast path for non-assignment case */
846         if (variable == NULL)
847         {
848                 if (system(command))
849                 {
850                         if (!timer_exceeded)
851                                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
852                         return false;
853                 }
854                 return true;
855         }
856
857         /* Execute the command with pipe and read the standard output. */
858         if ((fp = popen(command, "r")) == NULL)
859         {
860                 fprintf(stderr, "%s: cannot launch shell command\n", argv[0]);
861                 return false;
862         }
863         if (fgets(res, sizeof(res), fp) == NULL)
864         {
865                 if (!timer_exceeded)
866                         fprintf(stderr, "%s: cannot read the result\n", argv[0]);
867                 return false;
868         }
869         if (pclose(fp) < 0)
870         {
871                 fprintf(stderr, "%s: cannot close shell command\n", argv[0]);
872                 return false;
873         }
874
875         /* Check whether the result is an integer and assign it to the variable */
876         retval = (int) strtol(res, &endptr, 10);
877         while (*endptr != '\0' && isspace((unsigned char) *endptr))
878                 endptr++;
879         if (*res == '\0' || *endptr != '\0')
880         {
881                 fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res);
882                 return false;
883         }
884         snprintf(res, sizeof(res), "%d", retval);
885         if (!putVariable(st, "setshell", variable, res))
886                 return false;
887
888 #ifdef DEBUG
889         printf("shell parameter name: %s, value: %s\n", argv[1], res);
890 #endif
891         return true;
892 }
893
894 #define MAX_PREPARE_NAME                32
895 static void
896 preparedStatementName(char *buffer, int file, int state)
897 {
898         sprintf(buffer, "P%d_%d", file, state);
899 }
900
901 static bool
902 clientDone(CState *st, bool ok)
903 {
904         (void) ok;                                      /* unused */
905
906         if (st->con != NULL)
907         {
908                 PQfinish(st->con);
909                 st->con = NULL;
910         }
911         return false;                           /* always false */
912 }
913
914 /* return false iff client should be disconnected */
915 static bool
916 doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile)
917 {
918         PGresult   *res;
919         Command   **commands;
920
921 top:
922         commands = sql_files[st->use_file];
923
924         if (st->sleeping)
925         {                                                       /* are we sleeping? */
926                 instr_time      now;
927
928                 INSTR_TIME_SET_CURRENT(now);
929                 if (st->until <= INSTR_TIME_GET_MICROSEC(now))
930                         st->sleeping = 0;       /* Done sleeping, go ahead with next command */
931                 else
932                         return true;            /* Still sleeping, nothing to do here */
933         }
934
935         if (st->listen)
936         {                                                       /* are we receiver? */
937                 if (commands[st->state]->type == SQL_COMMAND)
938                 {
939                         if (debug)
940                                 fprintf(stderr, "client %d receiving\n", st->id);
941                         if (!PQconsumeInput(st->con))
942                         {                                       /* there's something wrong */
943                                 fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
944                                 return clientDone(st, false);
945                         }
946                         if (PQisBusy(st->con))
947                                 return true;    /* don't have the whole result yet */
948                 }
949
950                 /*
951                  * command finished: accumulate per-command execution times in
952                  * thread-local data structure, if per-command latencies are requested
953                  */
954                 if (is_latencies)
955                 {
956                         instr_time      now;
957                         int                     cnum = commands[st->state]->command_num;
958
959                         INSTR_TIME_SET_CURRENT(now);
960                         INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
961                                                                   now, st->stmt_begin);
962                         thread->exec_count[cnum]++;
963                 }
964
965                 /*
966                  * if transaction finished, record the time it took in the log
967                  */
968                 if (logfile && commands[st->state + 1] == NULL)
969                 {
970                         instr_time      now;
971                         instr_time      diff;
972                         double          usec;
973
974                         /*
975                          * write the log entry if this row belongs to the random sample,
976                          * or no sampling rate was given which means log everything.
977                          */
978                         if (sample_rate == 0.0 ||
979                                 pg_erand48(thread->random_state) <= sample_rate)
980                         {
981
982                                 INSTR_TIME_SET_CURRENT(now);
983                                 diff = now;
984                                 INSTR_TIME_SUBTRACT(diff, st->txn_begin);
985                                 usec = (double) INSTR_TIME_GET_MICROSEC(diff);
986
987 #ifndef WIN32
988                                 /* This is more than we really ought to know about instr_time */
989                                 fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
990                                                 st->id, st->cnt, usec, st->use_file,
991                                                 (long) now.tv_sec, (long) now.tv_usec);
992 #else
993                                 /* On Windows, instr_time doesn't provide a timestamp anyway */
994                                 fprintf(logfile, "%d %d %.0f %d 0 0\n",
995                                                 st->id, st->cnt, usec, st->use_file);
996 #endif
997                         }
998                 }
999
1000                 if (commands[st->state]->type == SQL_COMMAND)
1001                 {
1002                         /*
1003                          * Read and discard the query result; note this is not included in
1004                          * the statement latency numbers.
1005                          */
1006                         res = PQgetResult(st->con);
1007                         switch (PQresultStatus(res))
1008                         {
1009                                 case PGRES_COMMAND_OK:
1010                                 case PGRES_TUPLES_OK:
1011                                         break;          /* OK */
1012                                 default:
1013                                         fprintf(stderr, "Client %d aborted in state %d: %s",
1014                                                         st->id, st->state, PQerrorMessage(st->con));
1015                                         PQclear(res);
1016                                         return clientDone(st, false);
1017                         }
1018                         PQclear(res);
1019                         discard_response(st);
1020                 }
1021
1022                 if (commands[st->state + 1] == NULL)
1023                 {
1024                         if (is_connect)
1025                         {
1026                                 PQfinish(st->con);
1027                                 st->con = NULL;
1028                         }
1029
1030                         ++st->cnt;
1031                         if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
1032                                 return clientDone(st, true);    /* exit success */
1033                 }
1034
1035                 /* increment state counter */
1036                 st->state++;
1037                 if (commands[st->state] == NULL)
1038                 {
1039                         st->state = 0;
1040                         st->use_file = (int) getrand(thread, 0, num_files - 1);
1041                         commands = sql_files[st->use_file];
1042                 }
1043         }
1044
1045         if (st->con == NULL)
1046         {
1047                 instr_time      start,
1048                                         end;
1049
1050                 INSTR_TIME_SET_CURRENT(start);
1051                 if ((st->con = doConnect()) == NULL)
1052                 {
1053                         fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
1054                         return clientDone(st, false);
1055                 }
1056                 INSTR_TIME_SET_CURRENT(end);
1057                 INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
1058         }
1059
1060         /* Record transaction start time if logging is enabled */
1061         if (logfile && st->state == 0)
1062                 INSTR_TIME_SET_CURRENT(st->txn_begin);
1063
1064         /* Record statement start time if per-command latencies are requested */
1065         if (is_latencies)
1066                 INSTR_TIME_SET_CURRENT(st->stmt_begin);
1067
1068         if (commands[st->state]->type == SQL_COMMAND)
1069         {
1070                 const Command *command = commands[st->state];
1071                 int                     r;
1072
1073                 if (querymode == QUERY_SIMPLE)
1074                 {
1075                         char       *sql;
1076
1077                         sql = pg_strdup(command->argv[0]);
1078                         sql = assignVariables(st, sql);
1079
1080                         if (debug)
1081                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1082                         r = PQsendQuery(st->con, sql);
1083                         free(sql);
1084                 }
1085                 else if (querymode == QUERY_EXTENDED)
1086                 {
1087                         const char *sql = command->argv[0];
1088                         const char *params[MAX_ARGS];
1089
1090                         getQueryParams(st, command, params);
1091
1092                         if (debug)
1093                                 fprintf(stderr, "client %d sending %s\n", st->id, sql);
1094                         r = PQsendQueryParams(st->con, sql, command->argc - 1,
1095                                                                   NULL, params, NULL, NULL, 0);
1096                 }
1097                 else if (querymode == QUERY_PREPARED)
1098                 {
1099                         char            name[MAX_PREPARE_NAME];
1100                         const char *params[MAX_ARGS];
1101
1102                         if (!st->prepared[st->use_file])
1103                         {
1104                                 int                     j;
1105
1106                                 for (j = 0; commands[j] != NULL; j++)
1107                                 {
1108                                         PGresult   *res;
1109                                         char            name[MAX_PREPARE_NAME];
1110
1111                                         if (commands[j]->type != SQL_COMMAND)
1112                                                 continue;
1113                                         preparedStatementName(name, st->use_file, j);
1114                                         res = PQprepare(st->con, name,
1115                                                   commands[j]->argv[0], commands[j]->argc - 1, NULL);
1116                                         if (PQresultStatus(res) != PGRES_COMMAND_OK)
1117                                                 fprintf(stderr, "%s", PQerrorMessage(st->con));
1118                                         PQclear(res);
1119                                 }
1120                                 st->prepared[st->use_file] = true;
1121                         }
1122
1123                         getQueryParams(st, command, params);
1124                         preparedStatementName(name, st->use_file, st->state);
1125
1126                         if (debug)
1127                                 fprintf(stderr, "client %d sending %s\n", st->id, name);
1128                         r = PQsendQueryPrepared(st->con, name, command->argc - 1,
1129                                                                         params, NULL, NULL, 0);
1130                 }
1131                 else    /* unknown sql mode */
1132                         r = 0;
1133
1134                 if (r == 0)
1135                 {
1136                         if (debug)
1137                                 fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
1138                         st->ecnt++;
1139                 }
1140                 else
1141                         st->listen = 1;         /* flags that should be listened */
1142         }
1143         else if (commands[st->state]->type == META_COMMAND)
1144         {
1145                 int                     argc = commands[st->state]->argc,
1146                                         i;
1147                 char      **argv = commands[st->state]->argv;
1148
1149                 if (debug)
1150                 {
1151                         fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
1152                         for (i = 1; i < argc; i++)
1153                                 fprintf(stderr, " %s", argv[i]);
1154                         fprintf(stderr, "\n");
1155                 }
1156
1157                 if (pg_strcasecmp(argv[0], "setrandom") == 0)
1158                 {
1159                         char       *var;
1160                         int64           min,
1161                                                 max;
1162                         char            res[64];
1163
1164                         if (*argv[2] == ':')
1165                         {
1166                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1167                                 {
1168                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1169                                         st->ecnt++;
1170                                         return true;
1171                                 }
1172                                 min = strtoint64(var);
1173                         }
1174                         else
1175                                 min = strtoint64(argv[2]);
1176
1177 #ifdef NOT_USED
1178                         if (min < 0)
1179                         {
1180                                 fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min);
1181                                 st->ecnt++;
1182                                 return;
1183                         }
1184 #endif
1185
1186                         if (*argv[3] == ':')
1187                         {
1188                                 if ((var = getVariable(st, argv[3] + 1)) == NULL)
1189                                 {
1190                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
1191                                         st->ecnt++;
1192                                         return true;
1193                                 }
1194                                 max = strtoint64(var);
1195                         }
1196                         else
1197                                 max = strtoint64(argv[3]);
1198
1199                         if (max < min)
1200                         {
1201                                 fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]);
1202                                 st->ecnt++;
1203                                 return true;
1204                         }
1205
1206                         /*
1207                          * getrand() needs to be able to subtract max from min and add
1208                          * one to the result without overflowing.  Since we know max > min,
1209                          * we can detect overflow just by checking for a negative result.
1210                          * But we must check both that the subtraction doesn't overflow,
1211                          * and that adding one to the result doesn't overflow either.
1212                          */
1213                         if (max - min < 0 || (max - min) + 1 < 0)
1214                         {
1215                                 fprintf(stderr, "%s: range too large\n", argv[0]);
1216                                 st->ecnt++;
1217                                 return true;
1218                         }
1219
1220 #ifdef DEBUG
1221                         printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
1222 #endif
1223                         snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
1224
1225                         if (!putVariable(st, argv[0], argv[1], res))
1226                         {
1227                                 st->ecnt++;
1228                                 return true;
1229                         }
1230
1231                         st->listen = 1;
1232                 }
1233                 else if (pg_strcasecmp(argv[0], "set") == 0)
1234                 {
1235                         char       *var;
1236                         int64           ope1,
1237                                                 ope2;
1238                         char            res[64];
1239
1240                         if (*argv[2] == ':')
1241                         {
1242                                 if ((var = getVariable(st, argv[2] + 1)) == NULL)
1243                                 {
1244                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
1245                                         st->ecnt++;
1246                                         return true;
1247                                 }
1248                                 ope1 = strtoint64(var);
1249                         }
1250                         else
1251                                 ope1 = strtoint64(argv[2]);
1252
1253                         if (argc < 5)
1254                                 snprintf(res, sizeof(res), INT64_FORMAT, ope1);
1255                         else
1256                         {
1257                                 if (*argv[4] == ':')
1258                                 {
1259                                         if ((var = getVariable(st, argv[4] + 1)) == NULL)
1260                                         {
1261                                                 fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
1262                                                 st->ecnt++;
1263                                                 return true;
1264                                         }
1265                                         ope2 = strtoint64(var);
1266                                 }
1267                                 else
1268                                         ope2 = strtoint64(argv[4]);
1269
1270                                 if (strcmp(argv[3], "+") == 0)
1271                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
1272                                 else if (strcmp(argv[3], "-") == 0)
1273                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
1274                                 else if (strcmp(argv[3], "*") == 0)
1275                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
1276                                 else if (strcmp(argv[3], "/") == 0)
1277                                 {
1278                                         if (ope2 == 0)
1279                                         {
1280                                                 fprintf(stderr, "%s: division by zero\n", argv[0]);
1281                                                 st->ecnt++;
1282                                                 return true;
1283                                         }
1284                                         snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
1285                                 }
1286                                 else
1287                                 {
1288                                         fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
1289                                         st->ecnt++;
1290                                         return true;
1291                                 }
1292                         }
1293
1294                         if (!putVariable(st, argv[0], argv[1], res))
1295                         {
1296                                 st->ecnt++;
1297                                 return true;
1298                         }
1299
1300                         st->listen = 1;
1301                 }
1302                 else if (pg_strcasecmp(argv[0], "sleep") == 0)
1303                 {
1304                         char       *var;
1305                         int                     usec;
1306                         instr_time      now;
1307
1308                         if (*argv[1] == ':')
1309                         {
1310                                 if ((var = getVariable(st, argv[1] + 1)) == NULL)
1311                                 {
1312                                         fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
1313                                         st->ecnt++;
1314                                         return true;
1315                                 }
1316                                 usec = atoi(var);
1317                         }
1318                         else
1319                                 usec = atoi(argv[1]);
1320
1321                         if (argc > 2)
1322                         {
1323                                 if (pg_strcasecmp(argv[2], "ms") == 0)
1324                                         usec *= 1000;
1325                                 else if (pg_strcasecmp(argv[2], "s") == 0)
1326                                         usec *= 1000000;
1327                         }
1328                         else
1329                                 usec *= 1000000;
1330
1331                         INSTR_TIME_SET_CURRENT(now);
1332                         st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
1333                         st->sleeping = 1;
1334
1335                         st->listen = 1;
1336                 }
1337                 else if (pg_strcasecmp(argv[0], "setshell") == 0)
1338                 {
1339                         bool            ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
1340
1341                         if (timer_exceeded) /* timeout */
1342                                 return clientDone(st, true);
1343                         else if (!ret)          /* on error */
1344                         {
1345                                 st->ecnt++;
1346                                 return true;
1347                         }
1348                         else    /* succeeded */
1349                                 st->listen = 1;
1350                 }
1351                 else if (pg_strcasecmp(argv[0], "shell") == 0)
1352                 {
1353                         bool            ret = runShellCommand(st, NULL, argv + 1, argc - 1);
1354
1355                         if (timer_exceeded) /* timeout */
1356                                 return clientDone(st, true);
1357                         else if (!ret)          /* on error */
1358                         {
1359                                 st->ecnt++;
1360                                 return true;
1361                         }
1362                         else    /* succeeded */
1363                                 st->listen = 1;
1364                 }
1365                 goto top;
1366         }
1367
1368         return true;
1369 }
1370
1371 /* discard connections */
1372 static void
1373 disconnect_all(CState *state, int length)
1374 {
1375         int                     i;
1376
1377         for (i = 0; i < length; i++)
1378         {
1379                 if (state[i].con)
1380                 {
1381                         PQfinish(state[i].con);
1382                         state[i].con = NULL;
1383                 }
1384         }
1385 }
1386
1387 /* create tables and setup data */
1388 static void
1389 init(bool is_no_vacuum)
1390 {
1391
1392 /* The scale factor at/beyond which 32bit integers are incapable of storing
1393  * 64bit values.
1394  *
1395  * Although the actual threshold is 21474, we use 20000 because it is easier to
1396  * document and remember, and isn't that far away from the real threshold.
1397  */
1398 #define SCALE_32BIT_THRESHOLD 20000
1399
1400         /*
1401          * Note: TPC-B requires at least 100 bytes per row, and the "filler"
1402          * fields in these table declarations were intended to comply with that.
1403          * But because they default to NULLs, they don't actually take any space.
1404          * We could fix that by giving them non-null default values. However, that
1405          * would completely break comparability of pgbench results with prior
1406          * versions.  Since pgbench has never pretended to be fully TPC-B
1407          * compliant anyway, we stick with the historical behavior.
1408          */
1409         struct ddlinfo
1410         {
1411                 char       *table;
1412                 char       *cols;
1413                 int                     declare_fillfactor;
1414         };
1415         struct ddlinfo DDLs[] = {
1416                 {
1417                         "pgbench_history",
1418                         scale >= SCALE_32BIT_THRESHOLD
1419                                 ? "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)"
1420                                 : "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",
1421                         0
1422                 },
1423                 {
1424                         "pgbench_tellers",
1425                         "tid int not null,bid int,tbalance int,filler char(84)",
1426                         1
1427                 },
1428                 {
1429                         "pgbench_accounts",
1430                         scale >= SCALE_32BIT_THRESHOLD
1431                                 ? "aid bigint not null,bid int,abalance int,filler char(84)"
1432                                 : "aid    int not null,bid int,abalance int,filler char(84)",
1433                         1
1434                 },
1435                 {
1436                         "pgbench_branches",
1437                         "bid int not null,bbalance int,filler char(88)",
1438                         1
1439                 }
1440         };
1441         static char *DDLAFTERs[] = {
1442                 "alter table pgbench_branches add primary key (bid)",
1443                 "alter table pgbench_tellers add primary key (tid)",
1444                 "alter table pgbench_accounts add primary key (aid)"
1445         };
1446         static char *DDLKEYs[] = {
1447                 "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
1448                 "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
1449                 "alter table pgbench_history add foreign key (bid) references pgbench_branches",
1450                 "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
1451                 "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
1452         };
1453
1454         PGconn     *con;
1455         PGresult   *res;
1456         char            sql[256];
1457         int                     i;
1458         int64           k;
1459
1460         /* used to track elapsed time and estimate of the remaining time */
1461         instr_time      start, diff;
1462         double          elapsed_sec, remaining_sec;
1463         int                     log_interval = 1;
1464
1465         if ((con = doConnect()) == NULL)
1466                 exit(1);
1467
1468         for (i = 0; i < lengthof(DDLs); i++)
1469         {
1470                 char            opts[256];
1471                 char            buffer[256];
1472                 struct ddlinfo *ddl = &DDLs[i];
1473
1474                 /* Remove old table, if it exists. */
1475                 snprintf(buffer, 256, "drop table if exists %s", ddl->table);
1476                 executeStatement(con, buffer);
1477
1478                 /* Construct new create table statement. */
1479                 opts[0] = '\0';
1480                 if (ddl->declare_fillfactor)
1481                         snprintf(opts + strlen(opts), 256 - strlen(opts),
1482                                          " with (fillfactor=%d)", fillfactor);
1483                 if (tablespace != NULL)
1484                 {
1485                         char       *escape_tablespace;
1486
1487                         escape_tablespace = PQescapeIdentifier(con, tablespace,
1488                                                                                                    strlen(tablespace));
1489                         snprintf(opts + strlen(opts), 256 - strlen(opts),
1490                                          " tablespace %s", escape_tablespace);
1491                         PQfreemem(escape_tablespace);
1492                 }
1493                 snprintf(buffer, 256, "create%s table %s(%s)%s",
1494                                  unlogged_tables ? " unlogged" : "",
1495                                  ddl->table, ddl->cols, opts);
1496
1497                 executeStatement(con, buffer);
1498         }
1499
1500         executeStatement(con, "begin");
1501
1502         for (i = 0; i < nbranches * scale; i++)
1503         {
1504                 snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
1505                 executeStatement(con, sql);
1506         }
1507
1508         for (i = 0; i < ntellers * scale; i++)
1509         {
1510                 snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
1511                                  i + 1, i / ntellers + 1);
1512                 executeStatement(con, sql);
1513         }
1514
1515         executeStatement(con, "commit");
1516
1517         /*
1518          * fill the pgbench_accounts table with some data
1519          */
1520         fprintf(stderr, "creating tables...\n");
1521
1522         executeStatement(con, "begin");
1523         executeStatement(con, "truncate pgbench_accounts");
1524
1525         res = PQexec(con, "copy pgbench_accounts from stdin");
1526         if (PQresultStatus(res) != PGRES_COPY_IN)
1527         {
1528                 fprintf(stderr, "%s", PQerrorMessage(con));
1529                 exit(1);
1530         }
1531         PQclear(res);
1532
1533         INSTR_TIME_SET_CURRENT(start);
1534
1535         for (k = 0; k < (int64) naccounts * scale; k++)
1536         {
1537                 int64           j = k + 1;
1538
1539                 snprintf(sql, 256, INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", j, k / naccounts + 1, 0);
1540                 if (PQputline(con, sql))
1541                 {
1542                         fprintf(stderr, "PQputline failed\n");
1543                         exit(1);
1544                 }
1545
1546                 /* If we want to stick with the original logging, print a message each
1547                  * 100k inserted rows. */
1548                 if ((! use_quiet) && (j % 100000 == 0))
1549                 {
1550                         INSTR_TIME_SET_CURRENT(diff);
1551                         INSTR_TIME_SUBTRACT(diff, start);
1552
1553                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1554                         remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1555
1556                         fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1557                                                         j, (int64)naccounts * scale,
1558                                                         (int) (((int64) j * 100) / (naccounts * scale)),
1559                                                         elapsed_sec, remaining_sec);
1560                 }
1561                 /* let's not call the timing for each row, but only each 100 rows */
1562                 else if (use_quiet && (j % 100 == 0))
1563                 {
1564                         INSTR_TIME_SET_CURRENT(diff);
1565                         INSTR_TIME_SUBTRACT(diff, start);
1566
1567                         elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
1568                         remaining_sec = (scale * naccounts - j) * elapsed_sec / j;
1569
1570                         /* have we reached the next interval (or end)? */
1571                         if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) {
1572
1573                                 fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
1574                                                 j, (int64)naccounts * scale,
1575                                                 (int) (((int64) j * 100) / (naccounts * scale)), elapsed_sec, remaining_sec);
1576
1577                                 /* skip to the next interval */
1578                                 log_interval = (int)ceil(elapsed_sec/LOG_STEP_SECONDS);
1579                         }
1580                 }
1581
1582         }
1583         if (PQputline(con, "\\.\n"))
1584         {
1585                 fprintf(stderr, "very last PQputline failed\n");
1586                 exit(1);
1587         }
1588         if (PQendcopy(con))
1589         {
1590                 fprintf(stderr, "PQendcopy failed\n");
1591                 exit(1);
1592         }
1593         executeStatement(con, "commit");
1594
1595         /* vacuum */
1596         if (!is_no_vacuum)
1597         {
1598                 fprintf(stderr, "vacuum...\n");
1599                 executeStatement(con, "vacuum analyze pgbench_branches");
1600                 executeStatement(con, "vacuum analyze pgbench_tellers");
1601                 executeStatement(con, "vacuum analyze pgbench_accounts");
1602                 executeStatement(con, "vacuum analyze pgbench_history");
1603         }
1604
1605         /*
1606          * create indexes
1607          */
1608         fprintf(stderr, "set primary keys...\n");
1609         for (i = 0; i < lengthof(DDLAFTERs); i++)
1610         {
1611                 char            buffer[256];
1612
1613                 strncpy(buffer, DDLAFTERs[i], 256);
1614
1615                 if (index_tablespace != NULL)
1616                 {
1617                         char       *escape_tablespace;
1618
1619                         escape_tablespace = PQescapeIdentifier(con, index_tablespace,
1620                                                                                                    strlen(index_tablespace));
1621                         snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
1622                                          " using index tablespace %s", escape_tablespace);
1623                         PQfreemem(escape_tablespace);
1624                 }
1625
1626                 executeStatement(con, buffer);
1627         }
1628
1629         /*
1630          * create foreign keys
1631          */
1632         if (foreign_keys)
1633         {
1634                 fprintf(stderr, "set foreign keys...\n");
1635                 for (i = 0; i < lengthof(DDLKEYs); i++)
1636                 {
1637                         executeStatement(con, DDLKEYs[i]);
1638                 }
1639         }
1640
1641
1642         fprintf(stderr, "done.\n");
1643         PQfinish(con);
1644 }
1645
1646 /*
1647  * Parse the raw sql and replace :param to $n.
1648  */
1649 static bool
1650 parseQuery(Command *cmd, const char *raw_sql)
1651 {
1652         char       *sql,
1653                            *p;
1654
1655         sql = pg_strdup(raw_sql);
1656         cmd->argc = 1;
1657
1658         p = sql;
1659         while ((p = strchr(p, ':')) != NULL)
1660         {
1661                 char            var[12];
1662                 char       *name;
1663                 int                     eaten;
1664
1665                 name = parseVariable(p, &eaten);
1666                 if (name == NULL)
1667                 {
1668                         while (*p == ':')
1669                         {
1670                                 p++;
1671                         }
1672                         continue;
1673                 }
1674
1675                 if (cmd->argc >= MAX_ARGS)
1676                 {
1677                         fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql);
1678                         return false;
1679                 }
1680
1681                 sprintf(var, "$%d", cmd->argc);
1682                 p = replaceVariable(&sql, p, eaten, var);
1683
1684                 cmd->argv[cmd->argc] = name;
1685                 cmd->argc++;
1686         }
1687
1688         cmd->argv[0] = sql;
1689         return true;
1690 }
1691
1692 /* Parse a command; return a Command struct, or NULL if it's a comment */
1693 static Command *
1694 process_commands(char *buf)
1695 {
1696         const char      delim[] = " \f\n\r\t\v";
1697
1698         Command    *my_commands;
1699         int                     j;
1700         char       *p,
1701                            *tok;
1702
1703         /* Make the string buf end at the next newline */
1704         if ((p = strchr(buf, '\n')) != NULL)
1705                 *p = '\0';
1706
1707         /* Skip leading whitespace */
1708         p = buf;
1709         while (isspace((unsigned char) *p))
1710                 p++;
1711
1712         /* If the line is empty or actually a comment, we're done */
1713         if (*p == '\0' || strncmp(p, "--", 2) == 0)
1714                 return NULL;
1715
1716         /* Allocate and initialize Command structure */
1717         my_commands = (Command *) pg_malloc(sizeof(Command));
1718         my_commands->line = pg_strdup(buf);
1719         my_commands->command_num = num_commands++;
1720         my_commands->type = 0;          /* until set */
1721         my_commands->argc = 0;
1722
1723         if (*p == '\\')
1724         {
1725                 my_commands->type = META_COMMAND;
1726
1727                 j = 0;
1728                 tok = strtok(++p, delim);
1729
1730                 while (tok != NULL)
1731                 {
1732                         my_commands->argv[j++] = pg_strdup(tok);
1733                         my_commands->argc++;
1734                         tok = strtok(NULL, delim);
1735                 }
1736
1737                 if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0)
1738                 {
1739                         if (my_commands->argc < 4)
1740                         {
1741                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1742                                 exit(1);
1743                         }
1744
1745                         for (j = 4; j < my_commands->argc; j++)
1746                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1747                                                 my_commands->argv[0], my_commands->argv[j]);
1748                 }
1749                 else if (pg_strcasecmp(my_commands->argv[0], "set") == 0)
1750                 {
1751                         if (my_commands->argc < 3)
1752                         {
1753                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1754                                 exit(1);
1755                         }
1756
1757                         for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++)
1758                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1759                                                 my_commands->argv[0], my_commands->argv[j]);
1760                 }
1761                 else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0)
1762                 {
1763                         if (my_commands->argc < 2)
1764                         {
1765                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1766                                 exit(1);
1767                         }
1768
1769                         /*
1770                          * Split argument into number and unit to allow "sleep 1ms" etc.
1771                          * We don't have to terminate the number argument with null
1772                          * because it will be parsed with atoi, which ignores trailing
1773                          * non-digit characters.
1774                          */
1775                         if (my_commands->argv[1][0] != ':')
1776                         {
1777                                 char       *c = my_commands->argv[1];
1778
1779                                 while (isdigit((unsigned char) *c))
1780                                         c++;
1781                                 if (*c)
1782                                 {
1783                                         my_commands->argv[2] = c;
1784                                         if (my_commands->argc < 3)
1785                                                 my_commands->argc = 3;
1786                                 }
1787                         }
1788
1789                         if (my_commands->argc >= 3)
1790                         {
1791                                 if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
1792                                         pg_strcasecmp(my_commands->argv[2], "ms") != 0 &&
1793                                         pg_strcasecmp(my_commands->argv[2], "s") != 0)
1794                                 {
1795                                         fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n",
1796                                                         my_commands->argv[0], my_commands->argv[2]);
1797                                         exit(1);
1798                                 }
1799                         }
1800
1801                         for (j = 3; j < my_commands->argc; j++)
1802                                 fprintf(stderr, "%s: extra argument \"%s\" ignored\n",
1803                                                 my_commands->argv[0], my_commands->argv[j]);
1804                 }
1805                 else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0)
1806                 {
1807                         if (my_commands->argc < 3)
1808                         {
1809                                 fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]);
1810                                 exit(1);
1811                         }
1812                 }
1813                 else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0)
1814                 {
1815                         if (my_commands->argc < 1)
1816                         {
1817                                 fprintf(stderr, "%s: missing command\n", my_commands->argv[0]);
1818                                 exit(1);
1819                         }
1820                 }
1821                 else
1822                 {
1823                         fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]);
1824                         exit(1);
1825                 }
1826         }
1827         else
1828         {
1829                 my_commands->type = SQL_COMMAND;
1830
1831                 switch (querymode)
1832                 {
1833                         case QUERY_SIMPLE:
1834                                 my_commands->argv[0] = pg_strdup(p);
1835                                 my_commands->argc++;
1836                                 break;
1837                         case QUERY_EXTENDED:
1838                         case QUERY_PREPARED:
1839                                 if (!parseQuery(my_commands, p))
1840                                         exit(1);
1841                                 break;
1842                         default:
1843                                 exit(1);
1844                 }
1845         }
1846
1847         return my_commands;
1848 }
1849
1850 static int
1851 process_file(char *filename)
1852 {
1853 #define COMMANDS_ALLOC_NUM 128
1854
1855         Command   **my_commands;
1856         FILE       *fd;
1857         int                     lineno;
1858         char            buf[BUFSIZ];
1859         int                     alloc_num;
1860
1861         if (num_files >= MAX_FILES)
1862         {
1863                 fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES);
1864                 exit(1);
1865         }
1866
1867         alloc_num = COMMANDS_ALLOC_NUM;
1868         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1869
1870         if (strcmp(filename, "-") == 0)
1871                 fd = stdin;
1872         else if ((fd = fopen(filename, "r")) == NULL)
1873         {
1874                 fprintf(stderr, "%s: %s\n", filename, strerror(errno));
1875                 return false;
1876         }
1877
1878         lineno = 0;
1879
1880         while (fgets(buf, sizeof(buf), fd) != NULL)
1881         {
1882                 Command    *command;
1883
1884                 command = process_commands(buf);
1885                 if (command == NULL)
1886                         continue;
1887
1888                 my_commands[lineno] = command;
1889                 lineno++;
1890
1891                 if (lineno >= alloc_num)
1892                 {
1893                         alloc_num += COMMANDS_ALLOC_NUM;
1894                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
1895                 }
1896         }
1897         fclose(fd);
1898
1899         my_commands[lineno] = NULL;
1900
1901         sql_files[num_files++] = my_commands;
1902
1903         return true;
1904 }
1905
1906 static Command **
1907 process_builtin(char *tb)
1908 {
1909 #define COMMANDS_ALLOC_NUM 128
1910
1911         Command   **my_commands;
1912         int                     lineno;
1913         char            buf[BUFSIZ];
1914         int                     alloc_num;
1915
1916         alloc_num = COMMANDS_ALLOC_NUM;
1917         my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
1918
1919         lineno = 0;
1920
1921         for (;;)
1922         {
1923                 char       *p;
1924                 Command    *command;
1925
1926                 p = buf;
1927                 while (*tb && *tb != '\n')
1928                         *p++ = *tb++;
1929
1930                 if (*tb == '\0')
1931                         break;
1932
1933                 if (*tb == '\n')
1934                         tb++;
1935
1936                 *p = '\0';
1937
1938                 command = process_commands(buf);
1939                 if (command == NULL)
1940                         continue;
1941
1942                 my_commands[lineno] = command;
1943                 lineno++;
1944
1945                 if (lineno >= alloc_num)
1946                 {
1947                         alloc_num += COMMANDS_ALLOC_NUM;
1948                         my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
1949                 }
1950         }
1951
1952         my_commands[lineno] = NULL;
1953
1954         return my_commands;
1955 }
1956
1957 /* print out results */
1958 static void
1959 printResults(int ttype, int normal_xacts, int nclients,
1960                          TState *threads, int nthreads,
1961                          instr_time total_time, instr_time conn_total_time)
1962 {
1963         double          time_include,
1964                                 tps_include,
1965                                 tps_exclude;
1966         char       *s;
1967
1968         time_include = INSTR_TIME_GET_DOUBLE(total_time);
1969         tps_include = normal_xacts / time_include;
1970         tps_exclude = normal_xacts / (time_include -
1971                                                 (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
1972
1973         if (ttype == 0)
1974                 s = "TPC-B (sort of)";
1975         else if (ttype == 2)
1976                 s = "Update only pgbench_accounts";
1977         else if (ttype == 1)
1978                 s = "SELECT only";
1979         else
1980                 s = "Custom query";
1981
1982         printf("transaction type: %s\n", s);
1983         printf("scaling factor: %d\n", scale);
1984         printf("query mode: %s\n", QUERYMODE[querymode]);
1985         printf("number of clients: %d\n", nclients);
1986         printf("number of threads: %d\n", nthreads);
1987         if (duration <= 0)
1988         {
1989                 printf("number of transactions per client: %d\n", nxacts);
1990                 printf("number of transactions actually processed: %d/%d\n",
1991                            normal_xacts, nxacts * nclients);
1992         }
1993         else
1994         {
1995                 printf("duration: %d s\n", duration);
1996                 printf("number of transactions actually processed: %d\n",
1997                            normal_xacts);
1998         }
1999         printf("tps = %f (including connections establishing)\n", tps_include);
2000         printf("tps = %f (excluding connections establishing)\n", tps_exclude);
2001
2002         /* Report per-command latencies */
2003         if (is_latencies)
2004         {
2005                 int                     i;
2006
2007                 for (i = 0; i < num_files; i++)
2008                 {
2009                         Command   **commands;
2010
2011                         if (num_files > 1)
2012                                 printf("statement latencies in milliseconds, file %d:\n", i + 1);
2013                         else
2014                                 printf("statement latencies in milliseconds:\n");
2015
2016                         for (commands = sql_files[i]; *commands != NULL; commands++)
2017                         {
2018                                 Command    *command = *commands;
2019                                 int                     cnum = command->command_num;
2020                                 double          total_time;
2021                                 instr_time      total_exec_elapsed;
2022                                 int                     total_exec_count;
2023                                 int                     t;
2024
2025                                 /* Accumulate per-thread data for command */
2026                                 INSTR_TIME_SET_ZERO(total_exec_elapsed);
2027                                 total_exec_count = 0;
2028                                 for (t = 0; t < nthreads; t++)
2029                                 {
2030                                         TState     *thread = &threads[t];
2031
2032                                         INSTR_TIME_ADD(total_exec_elapsed,
2033                                                                    thread->exec_elapsed[cnum]);
2034                                         total_exec_count += thread->exec_count[cnum];
2035                                 }
2036
2037                                 if (total_exec_count > 0)
2038                                         total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
2039                                 else
2040                                         total_time = 0.0;
2041
2042                                 printf("\t%f\t%s\n", total_time, command->line);
2043                         }
2044                 }
2045         }
2046 }
2047
2048
2049 int
2050 main(int argc, char **argv)
2051 {
2052         static struct option long_options[] = {
2053                 {"foreign-keys", no_argument, &foreign_keys, 1},
2054                 {"index-tablespace", required_argument, NULL, 3},
2055                 {"tablespace", required_argument, NULL, 2},
2056                 {"unlogged-tables", no_argument, &unlogged_tables, 1},
2057                 {"sampling-rate", required_argument, NULL, 4},
2058                 {NULL, 0, NULL, 0}
2059         };
2060
2061         int                     c;
2062         int                     nclients = 1;   /* default number of simulated clients */
2063         int                     nthreads = 1;   /* default number of threads */
2064         int                     is_init_mode = 0;               /* initialize mode? */
2065         int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
2066         int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
2067         int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
2068                                                                  * 2: skip update of branches and tellers */
2069         int                     optindex;
2070         char       *filename = NULL;
2071         bool            scale_given = false;
2072
2073         CState     *state;                      /* status of clients */
2074         TState     *threads;            /* array of thread */
2075
2076         instr_time      start_time;             /* start up time */
2077         instr_time      total_time;
2078         instr_time      conn_total_time;
2079         int                     total_xacts;
2080
2081         int                     i;
2082
2083 #ifdef HAVE_GETRLIMIT
2084         struct rlimit rlim;
2085 #endif
2086
2087         PGconn     *con;
2088         PGresult   *res;
2089         char       *env;
2090
2091         char            val[64];
2092
2093         progname = get_progname(argv[0]);
2094
2095         if (argc > 1)
2096         {
2097                 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2098                 {
2099                         usage();
2100                         exit(0);
2101                 }
2102                 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
2103                 {
2104                         puts("pgbench (PostgreSQL) " PG_VERSION);
2105                         exit(0);
2106                 }
2107         }
2108
2109 #ifdef WIN32
2110         /* stderr is buffered on Win32. */
2111         setvbuf(stderr, NULL, _IONBF, 0);
2112 #endif
2113
2114         if ((env = getenv("PGHOST")) != NULL && *env != '\0')
2115                 pghost = env;
2116         if ((env = getenv("PGPORT")) != NULL && *env != '\0')
2117                 pgport = env;
2118         else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
2119                 login = env;
2120
2121         state = (CState *) pg_malloc(sizeof(CState));
2122         memset(state, 0, sizeof(CState));
2123
2124         while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
2125         {
2126                 switch (c)
2127                 {
2128                         case 'i':
2129                                 is_init_mode++;
2130                                 break;
2131                         case 'h':
2132                                 pghost = pg_strdup(optarg);
2133                                 break;
2134                         case 'n':
2135                                 is_no_vacuum++;
2136                                 break;
2137                         case 'v':
2138                                 do_vacuum_accounts++;
2139                                 break;
2140                         case 'p':
2141                                 pgport = pg_strdup(optarg);
2142                                 break;
2143                         case 'd':
2144                                 debug++;
2145                                 break;
2146                         case 'S':
2147                                 ttype = 1;
2148                                 break;
2149                         case 'N':
2150                                 ttype = 2;
2151                                 break;
2152                         case 'c':
2153                                 nclients = atoi(optarg);
2154                                 if (nclients <= 0 || nclients > MAXCLIENTS)
2155                                 {
2156                                         fprintf(stderr, "invalid number of clients: %d\n", nclients);
2157                                         exit(1);
2158                                 }
2159 #ifdef HAVE_GETRLIMIT
2160 #ifdef RLIMIT_NOFILE                    /* most platforms use RLIMIT_NOFILE */
2161                                 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
2162 #else                                                   /* but BSD doesn't ... */
2163                                 if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
2164 #endif   /* RLIMIT_NOFILE */
2165                                 {
2166                                         fprintf(stderr, "getrlimit failed: %s\n", strerror(errno));
2167                                         exit(1);
2168                                 }
2169                                 if (rlim.rlim_cur <= (nclients + 2))
2170                                 {
2171                                         fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur);
2172                                         fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n");
2173                                         exit(1);
2174                                 }
2175 #endif   /* HAVE_GETRLIMIT */
2176                                 break;
2177                         case 'j':                       /* jobs */
2178                                 nthreads = atoi(optarg);
2179                                 if (nthreads <= 0)
2180                                 {
2181                                         fprintf(stderr, "invalid number of threads: %d\n", nthreads);
2182                                         exit(1);
2183                                 }
2184                                 break;
2185                         case 'C':
2186                                 is_connect = true;
2187                                 break;
2188                         case 'r':
2189                                 is_latencies = true;
2190                                 break;
2191                         case 's':
2192                                 scale_given = true;
2193                                 scale = atoi(optarg);
2194                                 if (scale <= 0)
2195                                 {
2196                                         fprintf(stderr, "invalid scaling factor: %d\n", scale);
2197                                         exit(1);
2198                                 }
2199                                 break;
2200                         case 't':
2201                                 if (duration > 0)
2202                                 {
2203                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2204                                         exit(1);
2205                                 }
2206                                 nxacts = atoi(optarg);
2207                                 if (nxacts <= 0)
2208                                 {
2209                                         fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
2210                                         exit(1);
2211                                 }
2212                                 break;
2213                         case 'T':
2214                                 if (nxacts > 0)
2215                                 {
2216                                         fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n");
2217                                         exit(1);
2218                                 }
2219                                 duration = atoi(optarg);
2220                                 if (duration <= 0)
2221                                 {
2222                                         fprintf(stderr, "invalid duration: %d\n", duration);
2223                                         exit(1);
2224                                 }
2225                                 break;
2226                         case 'U':
2227                                 login = pg_strdup(optarg);
2228                                 break;
2229                         case 'l':
2230                                 use_log = true;
2231                                 break;
2232                         case 'q':
2233                                 use_quiet = true;
2234                                 break;
2235                         case 'f':
2236                                 ttype = 3;
2237                                 filename = pg_strdup(optarg);
2238                                 if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
2239                                         exit(1);
2240                                 break;
2241                         case 'D':
2242                                 {
2243                                         char       *p;
2244
2245                                         if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
2246                                         {
2247                                                 fprintf(stderr, "invalid variable definition: %s\n", optarg);
2248                                                 exit(1);
2249                                         }
2250
2251                                         *p++ = '\0';
2252                                         if (!putVariable(&state[0], "option", optarg, p))
2253                                                 exit(1);
2254                                 }
2255                                 break;
2256                         case 'F':
2257                                 fillfactor = atoi(optarg);
2258                                 if ((fillfactor < 10) || (fillfactor > 100))
2259                                 {
2260                                         fprintf(stderr, "invalid fillfactor: %d\n", fillfactor);
2261                                         exit(1);
2262                                 }
2263                                 break;
2264                         case 'M':
2265                                 if (num_files > 0)
2266                                 {
2267                                         fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n");
2268                                         exit(1);
2269                                 }
2270                                 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
2271                                         if (strcmp(optarg, QUERYMODE[querymode]) == 0)
2272                                                 break;
2273                                 if (querymode >= NUM_QUERYMODE)
2274                                 {
2275                                         fprintf(stderr, "invalid query mode (-M): %s\n", optarg);
2276                                         exit(1);
2277                                 }
2278                                 break;
2279                         case 0:
2280                                 /* This covers long options which take no argument. */
2281                                 break;
2282                         case 2:                         /* tablespace */
2283                                 tablespace = pg_strdup(optarg);
2284                                 break;
2285                         case 3:                         /* index-tablespace */
2286                                 index_tablespace = pg_strdup(optarg);
2287                                 break;
2288                         case 4:
2289                                 sample_rate = atof(optarg);
2290                                 if (sample_rate <= 0.0 || sample_rate > 1.0)
2291                                 {
2292                                         fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
2293                                         exit(1);
2294                                 }
2295                                 break;
2296                         default:
2297                                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
2298                                 exit(1);
2299                                 break;
2300                 }
2301         }
2302
2303         if (argc > optind)
2304                 dbName = argv[optind];
2305         else
2306         {
2307                 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
2308                         dbName = env;
2309                 else if (login != NULL && *login != '\0')
2310                         dbName = login;
2311                 else
2312                         dbName = "";
2313         }
2314
2315         if (is_init_mode)
2316         {
2317                 init(is_no_vacuum);
2318                 exit(0);
2319         }
2320
2321         /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
2322         if (nxacts <= 0 && duration <= 0)
2323                 nxacts = DEFAULT_NXACTS;
2324
2325         if (nclients % nthreads != 0)
2326         {
2327                 fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads);
2328                 exit(1);
2329         }
2330
2331         /* --sampling-rate may be used only with -l */
2332         if (sample_rate > 0.0 && !use_log)
2333         {
2334                 fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
2335                 exit(1);
2336         }
2337
2338         /* -q may be used only with -i */
2339         if (use_quiet && !is_init_mode)
2340         {
2341                 fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
2342                 exit(1);
2343         }
2344
2345         /*
2346          * is_latencies only works with multiple threads in thread-based
2347          * implementations, not fork-based ones, because it supposes that the
2348          * parent can see changes made to the per-thread execution stats by child
2349          * threads.  It seems useful enough to accept despite this limitation, but
2350          * perhaps we should FIXME someday (by passing the stats data back up
2351          * through the parent-to-child pipes).
2352          */
2353 #ifndef ENABLE_THREAD_SAFETY
2354         if (is_latencies && nthreads > 1)
2355         {
2356                 fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
2357                 exit(1);
2358         }
2359 #endif
2360
2361         /*
2362          * save main process id in the global variable because process id will be
2363          * changed after fork.
2364          */
2365         main_pid = (int) getpid();
2366
2367         if (nclients > 1)
2368         {
2369                 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
2370                 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
2371
2372                 /* copy any -D switch values to all clients */
2373                 for (i = 1; i < nclients; i++)
2374                 {
2375                         int                     j;
2376
2377                         state[i].id = i;
2378                         for (j = 0; j < state[0].nvariables; j++)
2379                         {
2380                                 if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value))
2381                                         exit(1);
2382                         }
2383                 }
2384         }
2385
2386         if (debug)
2387         {
2388                 if (duration <= 0)
2389                         printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
2390                                    pghost, pgport, nclients, nxacts, dbName);
2391                 else
2392                         printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n",
2393                                    pghost, pgport, nclients, duration, dbName);
2394         }
2395
2396         /* opening connection... */
2397         con = doConnect();
2398         if (con == NULL)
2399                 exit(1);
2400
2401         if (PQstatus(con) == CONNECTION_BAD)
2402         {
2403                 fprintf(stderr, "Connection to database '%s' failed.\n", dbName);
2404                 fprintf(stderr, "%s", PQerrorMessage(con));
2405                 exit(1);
2406         }
2407
2408         if (ttype != 3)
2409         {
2410                 /*
2411                  * get the scaling factor that should be same as count(*) from
2412                  * pgbench_branches if this is not a custom query
2413                  */
2414                 res = PQexec(con, "select count(*) from pgbench_branches");
2415                 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2416                 {
2417                         fprintf(stderr, "%s", PQerrorMessage(con));
2418                         exit(1);
2419                 }
2420                 scale = atoi(PQgetvalue(res, 0, 0));
2421                 if (scale < 0)
2422                 {
2423                         fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale);
2424                         exit(1);
2425                 }
2426                 PQclear(res);
2427
2428                 /* warn if we override user-given -s switch */
2429                 if (scale_given)
2430                         fprintf(stderr,
2431                         "Scale option ignored, using pgbench_branches table count = %d\n",
2432                                         scale);
2433         }
2434
2435         /*
2436          * :scale variables normally get -s or database scale, but don't override
2437          * an explicit -D switch
2438          */
2439         if (getVariable(&state[0], "scale") == NULL)
2440         {
2441                 snprintf(val, sizeof(val), "%d", scale);
2442                 for (i = 0; i < nclients; i++)
2443                 {
2444                         if (!putVariable(&state[i], "startup", "scale", val))
2445                                 exit(1);
2446                 }
2447         }
2448
2449         if (!is_no_vacuum)
2450         {
2451                 fprintf(stderr, "starting vacuum...");
2452                 executeStatement(con, "vacuum pgbench_branches");
2453                 executeStatement(con, "vacuum pgbench_tellers");
2454                 executeStatement(con, "truncate pgbench_history");
2455                 fprintf(stderr, "end.\n");
2456
2457                 if (do_vacuum_accounts)
2458                 {
2459                         fprintf(stderr, "starting vacuum pgbench_accounts...");
2460                         executeStatement(con, "vacuum analyze pgbench_accounts");
2461                         fprintf(stderr, "end.\n");
2462                 }
2463         }
2464         PQfinish(con);
2465
2466         /* set random seed */
2467         INSTR_TIME_SET_CURRENT(start_time);
2468         srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
2469
2470         /* process builtin SQL scripts */
2471         switch (ttype)
2472         {
2473                 case 0:
2474                         sql_files[0] = process_builtin(tpc_b);
2475                         num_files = 1;
2476                         break;
2477
2478                 case 1:
2479                         sql_files[0] = process_builtin(select_only);
2480                         num_files = 1;
2481                         break;
2482
2483                 case 2:
2484                         sql_files[0] = process_builtin(simple_update);
2485                         num_files = 1;
2486                         break;
2487
2488                 default:
2489                         break;
2490         }
2491
2492         /* set up thread data structures */
2493         threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
2494         for (i = 0; i < nthreads; i++)
2495         {
2496                 TState     *thread = &threads[i];
2497
2498                 thread->tid = i;
2499                 thread->state = &state[nclients / nthreads * i];
2500                 thread->nstate = nclients / nthreads;
2501                 thread->random_state[0] = random();
2502                 thread->random_state[1] = random();
2503                 thread->random_state[2] = random();
2504
2505                 if (is_latencies)
2506                 {
2507                         /* Reserve memory for the thread to store per-command latencies */
2508                         int                     t;
2509
2510                         thread->exec_elapsed = (instr_time *)
2511                                 pg_malloc(sizeof(instr_time) * num_commands);
2512                         thread->exec_count = (int *)
2513                                 pg_malloc(sizeof(int) * num_commands);
2514
2515                         for (t = 0; t < num_commands; t++)
2516                         {
2517                                 INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
2518                                 thread->exec_count[t] = 0;
2519                         }
2520                 }
2521                 else
2522                 {
2523                         thread->exec_elapsed = NULL;
2524                         thread->exec_count = NULL;
2525                 }
2526         }
2527
2528         /* get start up time */
2529         INSTR_TIME_SET_CURRENT(start_time);
2530
2531         /* set alarm if duration is specified. */
2532         if (duration > 0)
2533                 setalarm(duration);
2534
2535         /* start threads */
2536         for (i = 0; i < nthreads; i++)
2537         {
2538                 TState     *thread = &threads[i];
2539
2540                 INSTR_TIME_SET_CURRENT(thread->start_time);
2541
2542                 /* the first thread (i = 0) is executed by main thread */
2543                 if (i > 0)
2544                 {
2545                         int                     err = pthread_create(&thread->thread, NULL, threadRun, thread);
2546
2547                         if (err != 0 || thread->thread == INVALID_THREAD)
2548                         {
2549                                 fprintf(stderr, "cannot create thread: %s\n", strerror(err));
2550                                 exit(1);
2551                         }
2552                 }
2553                 else
2554                 {
2555                         thread->thread = INVALID_THREAD;
2556                 }
2557         }
2558
2559         /* wait for threads and accumulate results */
2560         total_xacts = 0;
2561         INSTR_TIME_SET_ZERO(conn_total_time);
2562         for (i = 0; i < nthreads; i++)
2563         {
2564                 void       *ret = NULL;
2565
2566                 if (threads[i].thread == INVALID_THREAD)
2567                         ret = threadRun(&threads[i]);
2568                 else
2569                         pthread_join(threads[i].thread, &ret);
2570
2571                 if (ret != NULL)
2572                 {
2573                         TResult    *r = (TResult *) ret;
2574
2575                         total_xacts += r->xacts;
2576                         INSTR_TIME_ADD(conn_total_time, r->conn_time);
2577                         free(ret);
2578                 }
2579         }
2580         disconnect_all(state, nclients);
2581
2582         /* get end time */
2583         INSTR_TIME_SET_CURRENT(total_time);
2584         INSTR_TIME_SUBTRACT(total_time, start_time);
2585         printResults(ttype, total_xacts, nclients, threads, nthreads,
2586                                  total_time, conn_total_time);
2587
2588         return 0;
2589 }
2590
2591 static void *
2592 threadRun(void *arg)
2593 {
2594         TState     *thread = (TState *) arg;
2595         CState     *state = thread->state;
2596         TResult    *result;
2597         FILE       *logfile = NULL; /* per-thread log file */
2598         instr_time      start,
2599                                 end;
2600         int                     nstate = thread->nstate;
2601         int                     remains = nstate;               /* number of remaining clients */
2602         int                     i;
2603
2604         result = pg_malloc(sizeof(TResult));
2605         INSTR_TIME_SET_ZERO(result->conn_time);
2606
2607         /* open log file if requested */
2608         if (use_log)
2609         {
2610                 char            logpath[64];
2611
2612                 if (thread->tid == 0)
2613                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
2614                 else
2615                         snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
2616                 logfile = fopen(logpath, "w");
2617
2618                 if (logfile == NULL)
2619                 {
2620                         fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
2621                         goto done;
2622                 }
2623         }
2624
2625         if (!is_connect)
2626         {
2627                 /* make connections to the database */
2628                 for (i = 0; i < nstate; i++)
2629                 {
2630                         if ((state[i].con = doConnect()) == NULL)
2631                                 goto done;
2632                 }
2633         }
2634
2635         /* time after thread and connections set up */
2636         INSTR_TIME_SET_CURRENT(result->conn_time);
2637         INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
2638
2639         /* send start up queries in async manner */
2640         for (i = 0; i < nstate; i++)
2641         {
2642                 CState     *st = &state[i];
2643                 Command   **commands = sql_files[st->use_file];
2644                 int                     prev_ecnt = st->ecnt;
2645
2646                 st->use_file = getrand(thread, 0, num_files - 1);
2647                 if (!doCustom(thread, st, &result->conn_time, logfile))
2648                         remains--;                      /* I've aborted */
2649
2650                 if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2651                 {
2652                         fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
2653                         remains--;                      /* I've aborted */
2654                         PQfinish(st->con);
2655                         st->con = NULL;
2656                 }
2657         }
2658
2659         while (remains > 0)
2660         {
2661                 fd_set          input_mask;
2662                 int                     maxsock;        /* max socket number to be waited */
2663                 int64           now_usec = 0;
2664                 int64           min_usec;
2665
2666                 FD_ZERO(&input_mask);
2667
2668                 maxsock = -1;
2669                 min_usec = INT64_MAX;
2670                 for (i = 0; i < nstate; i++)
2671                 {
2672                         CState     *st = &state[i];
2673                         Command   **commands = sql_files[st->use_file];
2674                         int                     sock;
2675
2676                         if (st->sleeping)
2677                         {
2678                                 int                     this_usec;
2679
2680                                 if (min_usec == INT64_MAX)
2681                                 {
2682                                         instr_time      now;
2683
2684                                         INSTR_TIME_SET_CURRENT(now);
2685                                         now_usec = INSTR_TIME_GET_MICROSEC(now);
2686                                 }
2687
2688                                 this_usec = st->until - now_usec;
2689                                 if (min_usec > this_usec)
2690                                         min_usec = this_usec;
2691                         }
2692                         else if (st->con == NULL)
2693                         {
2694                                 continue;
2695                         }
2696                         else if (commands[st->state]->type == META_COMMAND)
2697                         {
2698                                 min_usec = 0;   /* the connection is ready to run */
2699                                 break;
2700                         }
2701
2702                         sock = PQsocket(st->con);
2703                         if (sock < 0)
2704                         {
2705                                 fprintf(stderr, "bad socket: %s\n", strerror(errno));
2706                                 goto done;
2707                         }
2708
2709                         FD_SET(sock, &input_mask);
2710
2711                         if (maxsock < sock)
2712                                 maxsock = sock;
2713                 }
2714
2715                 if (min_usec > 0 && maxsock != -1)
2716                 {
2717                         int                     nsocks; /* return from select(2) */
2718
2719                         if (min_usec != INT64_MAX)
2720                         {
2721                                 struct timeval timeout;
2722
2723                                 timeout.tv_sec = min_usec / 1000000;
2724                                 timeout.tv_usec = min_usec % 1000000;
2725                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
2726                         }
2727                         else
2728                                 nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
2729                         if (nsocks < 0)
2730                         {
2731                                 if (errno == EINTR)
2732                                         continue;
2733                                 /* must be something wrong */
2734                                 fprintf(stderr, "select failed: %s\n", strerror(errno));
2735                                 goto done;
2736                         }
2737                 }
2738
2739                 /* ok, backend returns reply */
2740                 for (i = 0; i < nstate; i++)
2741                 {
2742                         CState     *st = &state[i];
2743                         Command   **commands = sql_files[st->use_file];
2744                         int                     prev_ecnt = st->ecnt;
2745
2746                         if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
2747                                                         || commands[st->state]->type == META_COMMAND))
2748                         {
2749                                 if (!doCustom(thread, st, &result->conn_time, logfile))
2750                                         remains--;      /* I've aborted */
2751                         }
2752
2753                         if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
2754                         {
2755                                 fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
2756                                 remains--;              /* I've aborted */
2757                                 PQfinish(st->con);
2758                                 st->con = NULL;
2759                         }
2760                 }
2761         }
2762
2763 done:
2764         INSTR_TIME_SET_CURRENT(start);
2765         disconnect_all(state, nstate);
2766         result->xacts = 0;
2767         for (i = 0; i < nstate; i++)
2768                 result->xacts += state[i].cnt;
2769         INSTR_TIME_SET_CURRENT(end);
2770         INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
2771         if (logfile)
2772                 fclose(logfile);
2773         return result;
2774 }
2775
2776
2777 /*
2778  * Support for duration option: set timer_exceeded after so many seconds.
2779  */
2780
2781 #ifndef WIN32
2782
2783 static void
2784 handle_sig_alarm(SIGNAL_ARGS)
2785 {
2786         timer_exceeded = true;
2787 }
2788
2789 static void
2790 setalarm(int seconds)
2791 {
2792         pqsignal(SIGALRM, handle_sig_alarm);
2793         alarm(seconds);
2794 }
2795
2796 #ifndef ENABLE_THREAD_SAFETY
2797
2798 /*
2799  * implements pthread using fork.
2800  */
2801
2802 typedef struct fork_pthread
2803 {
2804         pid_t           pid;
2805         int                     pipes[2];
2806 }       fork_pthread;
2807
2808 static int
2809 pthread_create(pthread_t *thread,
2810                            pthread_attr_t *attr,
2811                            void *(*start_routine) (void *),
2812                            void *arg)
2813 {
2814         fork_pthread *th;
2815         void       *ret;
2816
2817         th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
2818         if (pipe(th->pipes) < 0)
2819         {
2820                 free(th);
2821                 return errno;
2822         }
2823
2824         th->pid = fork();
2825         if (th->pid == -1)                      /* error */
2826         {
2827                 free(th);
2828                 return errno;
2829         }
2830         if (th->pid != 0)                       /* in parent process */
2831         {
2832                 close(th->pipes[1]);
2833                 *thread = th;
2834                 return 0;
2835         }
2836
2837         /* in child process */
2838         close(th->pipes[0]);
2839
2840         /* set alarm again because the child does not inherit timers */
2841         if (duration > 0)
2842                 setalarm(duration);
2843
2844         ret = start_routine(arg);
2845         write(th->pipes[1], ret, sizeof(TResult));
2846         close(th->pipes[1]);
2847         free(th);
2848         exit(0);
2849 }
2850
2851 static int
2852 pthread_join(pthread_t th, void **thread_return)
2853 {
2854         int                     status;
2855
2856         while (waitpid(th->pid, &status, 0) != th->pid)
2857         {
2858                 if (errno != EINTR)
2859                         return errno;
2860         }
2861
2862         if (thread_return != NULL)
2863         {
2864                 /* assume result is TResult */
2865                 *thread_return = pg_malloc(sizeof(TResult));
2866                 if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
2867                 {
2868                         free(*thread_return);
2869                         *thread_return = NULL;
2870                 }
2871         }
2872         close(th->pipes[0]);
2873
2874         free(th);
2875         return 0;
2876 }
2877 #endif
2878 #else                                                   /* WIN32 */
2879
2880 static VOID CALLBACK
2881 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
2882 {
2883         timer_exceeded = true;
2884 }
2885
2886 static void
2887 setalarm(int seconds)
2888 {
2889         HANDLE          queue;
2890         HANDLE          timer;
2891
2892         /* This function will be called at most once, so we can cheat a bit. */
2893         queue = CreateTimerQueue();
2894         if (seconds > ((DWORD) -1) / 1000 ||
2895                 !CreateTimerQueueTimer(&timer, queue,
2896                                                            win32_timer_callback, NULL, seconds * 1000, 0,
2897                                                            WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
2898         {
2899                 fprintf(stderr, "Failed to set timer\n");
2900                 exit(1);
2901         }
2902 }
2903
2904 /* partial pthread implementation for Windows */
2905
2906 typedef struct win32_pthread
2907 {
2908         HANDLE          handle;
2909         void       *(*routine) (void *);
2910         void       *arg;
2911         void       *result;
2912 } win32_pthread;
2913
2914 static unsigned __stdcall
2915 win32_pthread_run(void *arg)
2916 {
2917         win32_pthread *th = (win32_pthread *) arg;
2918
2919         th->result = th->routine(th->arg);
2920
2921         return 0;
2922 }
2923
2924 static int
2925 pthread_create(pthread_t *thread,
2926                            pthread_attr_t *attr,
2927                            void *(*start_routine) (void *),
2928                            void *arg)
2929 {
2930         int                     save_errno;
2931         win32_pthread *th;
2932
2933         th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
2934         th->routine = start_routine;
2935         th->arg = arg;
2936         th->result = NULL;
2937
2938         th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
2939         if (th->handle == NULL)
2940         {
2941                 save_errno = errno;
2942                 free(th);
2943                 return save_errno;
2944         }
2945
2946         *thread = th;
2947         return 0;
2948 }
2949
2950 static int
2951 pthread_join(pthread_t th, void **thread_return)
2952 {
2953         if (th == NULL || th->handle == NULL)
2954                 return errno = EINVAL;
2955
2956         if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
2957         {
2958                 _dosmaperr(GetLastError());
2959                 return errno;
2960         }
2961
2962         if (thread_return)
2963                 *thread_return = th->result;
2964
2965         CloseHandle(th->handle);
2966         free(th);
2967         return 0;
2968 }
2969
2970 #endif   /* WIN32 */