]> granicus.if.org Git - postgresql/commitdiff
Multi-threaded version of pgbench contributed by ITAGAKI Takahiro,
authorTatsuo Ishii <ishii@postgresql.org>
Mon, 3 Aug 2009 15:18:14 +0000 (15:18 +0000)
committerTatsuo Ishii <ishii@postgresql.org>
Mon, 3 Aug 2009 15:18:14 +0000 (15:18 +0000)
reviewed by Greg Smith and Josh Williams.

Following is the proposal from ITAGAKI Takahiro:

Pgbench is a famous tool to measure postgres performance, but nowadays
it does not work well because it cannot use multiple CPUs. On the other
hand, postgres server can use CPUs very well, so the bottle-neck of
workload is *in pgbench*.

Multi-threading would be a solution. The attached patch adds -j
(number of jobs) option to pgbench. If the value N is greater than 1,
pgbench runs with N threads. Connections are equally-divided into
them (ex. -c64 -j4 => 4 threads with 16 connections each). It can
run on POSIX platforms with pthread and on Windows with win32 threads.

Here are results of multi-threaded pgbench runs on Fedora 11 with intel
core i7 (8 logical cores = 4 physical cores * HT). -j8 (8 threads) was
the best and the tps is 4.5 times of -j1, that is a traditional result.

$ pgbench -i -s10
$ pgbench -n -S -c64 -j1   =>  tps = 11600.158593
$ pgbench -n -S -c64 -j2   =>  tps = 17947.100954
$ pgbench -n -S -c64 -j4   =>  tps = 26571.124001
$ pgbench -n -S -c64 -j8   =>  tps = 52725.470403
$ pgbench -n -S -c64 -j16  =>  tps = 38976.675319
$ pgbench -n -S -c64 -j32  =>  tps = 28998.499601
$ pgbench -n -S -c64 -j64  =>  tps = 26701.877815

Is it acceptable to use pthread in contrib module?
If ok, I will add the patch to the next commitfest.

contrib/pgbench/pgbench.c
doc/src/sgml/pgbench.sgml

index 7ede6954aa16393ce7cdea4f5a5f75d1aee30772..0c3704a2ff14dad5749ef2e96b8eafa7aec2c32a 100644 (file)
@@ -4,7 +4,7 @@
  * A simple benchmark program for PostgreSQL
  * Originally written by Tatsuo Ishii and enhanced by many contributors.
  *
- * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.88 2009/07/30 09:28:00 mha Exp $
+ * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.89 2009/08/03 15:18:14 ishii Exp $
  * Copyright (c) 2000-2009, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -35,6 +35,7 @@
 
 #include "libpq-fe.h"
 #include "pqsignal.h"
+#include "portability/instr_time.h"
 
 #include <ctype.h>
 
 #include <sys/resource.h>              /* for getrlimit */
 #endif
 
+#ifndef INT64_MAX
+#define INT64_MAX      INT64CONST(0x7FFFFFFFFFFFFFFF)
+#endif
+
+/*
+ * Multi-platform pthread implementations
+ */
+
+#ifdef WIN32
+/* Use native win32 threads on Windows */
+typedef struct win32_pthread   *pthread_t;
+typedef int                                            pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+
+#elif defined(ENABLE_THREAD_SAFETY)
+/* Use platform-dependent pthread */
+#include <pthread.h>
+
+#else
+
+#include <sys/wait.h>
+/* Use emulation with fork. Rename pthread idendifiers to avoid conflictions */
+#define pthread_t                              pg_pthread_t
+#define pthread_attr_t                 pg_pthread_attr_t
+#define pthread_create                 pg_pthread_create
+#define pthread_join                   pg_pthread_join
+typedef struct fork_pthread       *pthread_t;
+typedef int                                            pthread_attr_t;
+static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg);
+static int pthread_join(pthread_t th, void **thread_return);
+
+#endif
+
 extern char *optarg;
 extern int     optind;
 
@@ -74,7 +109,6 @@ extern int   optind;
 
 #define DEFAULT_NXACTS 10              /* default nxacts */
 
-int                    nclients = 1;           /* default number of simulated clients */
 int                    nxacts = 0;                     /* number of transactions per client */
 int                    duration = 0;           /* duration in seconds */
 
@@ -102,8 +136,6 @@ FILE           *LOGFILE = NULL;
 
 bool           use_log;                        /* log transaction latencies to a file */
 
-int                    remains;                        /* number of remaining clients */
-
 int                    is_connect;                     /* establish connection  for each transaction */
 
 char      *pghost = "";
@@ -138,14 +170,33 @@ typedef struct
        int                     listen;                 /* 0 indicates that an async query has been
                                                                 * sent */
        int                     sleeping;               /* 1 indicates that the client is napping */
-       struct timeval until;           /* napping until */
+       int64           until;                  /* napping until (usec) */
        Variable   *variables;          /* array of variable definitions */
        int                     nvariables;
-       struct timeval txn_begin;       /* used for measuring latencies */
+       instr_time      txn_begin;              /* used for measuring latencies */
        int                     use_file;               /* index in sql_files for this client */
        bool            prepared[MAX_FILES];
 } CState;
 
+/*
+ * Thread state and result
+ */
+typedef struct
+{
+       pthread_t               thread;         /* thread handle */
+       CState             *state;              /* array of CState */
+       int                             nstate;         /* length of state */
+       instr_time              start_time;     /* thread start time */
+} TState;
+
+#define INVALID_THREAD         ((pthread_t) 0)
+
+typedef struct
+{
+       instr_time              conn_time;
+       int                             xacts;
+} TResult;
+
 /*
  * queries read from files
  */
@@ -171,8 +222,9 @@ typedef struct
        char       *argv[MAX_ARGS]; /* command list */
 } Command;
 
-Command   **sql_files[MAX_FILES];              /* SQL script files */
-int                    num_files;                      /* number of script files */
+static Command   **sql_files[MAX_FILES];       /* SQL script files */
+static int                     num_files;                              /* number of script files */
+static int                     debug = 0;                              /* debug flag */
 
 /* default scenario */
 static char *tpc_b = {
@@ -215,44 +267,9 @@ static char *select_only = {
        "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
 };
 
-/* Connection overhead time */
-static struct timeval conn_total_time = {0, 0};
-
 /* Function prototypes */
 static void setalarm(int seconds);
-
-
-/* Calculate total time */
-static void
-addTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
-{
-       int                     sec = t1->tv_sec + t2->tv_sec;
-       int                     usec = t1->tv_usec + t2->tv_usec;
-
-       if (usec >= 1000000)
-       {
-               usec -= 1000000;
-               sec++;
-       }
-       result->tv_sec = sec;
-       result->tv_usec = usec;
-}
-
-/* Calculate time difference */
-static void
-diffTime(struct timeval * t1, struct timeval * t2, struct timeval * result)
-{
-       int                     sec = t1->tv_sec - t2->tv_sec;
-       int                     usec = t1->tv_usec - t2->tv_usec;
-
-       if (usec < 0)
-       {
-               usec += 1000000;
-               sec--;
-       }
-       result->tv_sec = sec;
-       result->tv_usec = usec;
-}
+static void* threadRun(void *arg);
 
 static void
 usage(const char *progname)
@@ -270,6 +287,7 @@ usage(const char *progname)
                   "  -D VARNAME=VALUE\n"
                   "               define variable for use by custom script\n"
                   "  -f FILENAME  read transaction script from FILENAME\n"
+                  "  -j NUM       number of threads (default: 1)\n"
                   "  -l           write transaction times to log file\n"
                   "  -M {simple|extended|prepared}\n"
                   "               protocol for submitting queries to server (default: simple)\n"
@@ -379,29 +397,6 @@ discard_response(CState *state)
        } while (res);
 }
 
-/* check to see if the SQL result was good */
-static int
-check(CState *state, PGresult *res, int n)
-{
-       CState     *st = &state[n];
-
-       switch (PQresultStatus(res))
-       {
-               case PGRES_COMMAND_OK:
-               case PGRES_TUPLES_OK:
-                       /* OK */
-                       break;
-               default:
-                       fprintf(stderr, "Client %d aborted in state %d: %s",
-                                       n, st->state, PQerrorMessage(st->con));
-                       remains--;                      /* I've aborted */
-                       PQfinish(st->con);
-                       st->con = NULL;
-                       return (-1);
-       }
-       return (0);                                     /* OK */
-}
-
 static int
 compareVariables(const void *v1, const void *v2)
 {
@@ -598,11 +593,24 @@ preparedStatementName(char *buffer, int file, int state)
        sprintf(buffer, "P%d_%d", file, state);
 }
 
-static void
-doCustom(CState *state, int n, int debug)
+static bool
+clientDone(CState *st, bool ok)
+{
+       (void) ok;      /* unused */
+
+       if (st->con != NULL)
+       {
+               PQfinish(st->con);
+               st->con = NULL;
+       }
+       return false;   /* always false */
+}
+
+/* return false iff client should be disconnected */
+static bool
+doCustom(CState *st, instr_time *conn_time)
 {
        PGresult   *res;
-       CState     *st = &state[n];
        Command   **commands;
 
 top:
@@ -610,16 +618,13 @@ top:
 
        if (st->sleeping)
        {                                                       /* are we sleeping? */
-               int                     usec;
-               struct timeval now;
+               instr_time      now;
 
-               gettimeofday(&now, NULL);
-               usec = (st->until.tv_sec - now.tv_sec) * 1000000 +
-                       st->until.tv_usec - now.tv_usec;
-               if (usec <= 0)
+               INSTR_TIME_SET_CURRENT(now);
+               if (st->until <= INSTR_TIME_GET_MICROSEC(now))
                        st->sleeping = 0;       /* Done sleeping, go ahead with next command */
                else
-                       return;                         /* Still sleeping, nothing to do here */
+                       return true;            /* Still sleeping, nothing to do here */
        }
 
        if (st->listen)
@@ -627,17 +632,14 @@ top:
                if (commands[st->state]->type == SQL_COMMAND)
                {
                        if (debug)
-                               fprintf(stderr, "client %d receiving\n", n);
+                               fprintf(stderr, "client %d receiving\n", st->id);
                        if (!PQconsumeInput(st->con))
                        {                                       /* there's something wrong */
-                               fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
-                               remains--;              /* I've aborted */
-                               PQfinish(st->con);
-                               st->con = NULL;
-                               return;
+                               fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state);
+                               return clientDone(st, false);
                        }
                        if (PQisBusy(st->con))
-                               return;                 /* don't have the whole result yet */
+                               return true;    /* don't have the whole result yet */
                }
 
                /*
@@ -645,25 +647,35 @@ top:
                 */
                if (use_log && commands[st->state + 1] == NULL)
                {
-                       double          diff;
-                       struct timeval now;
-
-                       gettimeofday(&now, NULL);
-                       diff = (int) (now.tv_sec - st->txn_begin.tv_sec) * 1000000.0 +
-                               (int) (now.tv_usec - st->txn_begin.tv_usec);
-
-                       fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n",
-                                       st->id, st->cnt, diff, st->use_file,
-                                       (long) now.tv_sec, (long) now.tv_usec);
+                       instr_time      diff;
+                       double          sec;
+                       double          msec;
+                       double          usec;
+
+                       INSTR_TIME_SET_CURRENT(diff);
+                       INSTR_TIME_SUBTRACT(diff, st->txn_begin);
+                       sec = INSTR_TIME_GET_DOUBLE(diff);
+                       msec = INSTR_TIME_GET_MILLISEC(diff);
+                       usec = (double) INSTR_TIME_GET_MICROSEC(diff);
+
+                       fprintf(LOGFILE, "%d %d %.0f %d %.0f %.0f\n",
+                                       st->id, st->cnt, usec, st->use_file,
+                                       sec, usec - sec * 1000.0);
                }
 
                if (commands[st->state]->type == SQL_COMMAND)
                {
                        res = PQgetResult(st->con);
-                       if (check(state, res, n))
+                       switch (PQresultStatus(res))
                        {
-                               PQclear(res);
-                               return;
+                               case PGRES_COMMAND_OK:
+                               case PGRES_TUPLES_OK:
+                                       break;  /* OK */
+                               default:
+                                       fprintf(stderr, "Client %d aborted in state %d: %s",
+                                               st->id, st->state, PQerrorMessage(st->con));
+                                       PQclear(res);
+                                       return clientDone(st, false);
                        }
                        PQclear(res);
                        discard_response(st);
@@ -679,15 +691,7 @@ top:
 
                        ++st->cnt;
                        if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
-                       {
-                               remains--;              /* I've done */
-                               if (st->con != NULL)
-                               {
-                                       PQfinish(st->con);
-                                       st->con = NULL;
-                               }
-                               return;
-                       }
+                               return clientDone(st, true);    /* exit success */
                }
 
                /* increment state counter */
@@ -702,27 +706,20 @@ top:
 
        if (st->con == NULL)
        {
-               struct timeval t1,
-                                       t2,
-                                       t3;
+               instr_time      start, end;
 
-               gettimeofday(&t1, NULL);
+               INSTR_TIME_SET_CURRENT(start);
                if ((st->con = doConnect()) == NULL)
                {
-                       fprintf(stderr, "Client %d aborted in establishing connection.\n",
-                                       n);
-                       remains--;                      /* I've aborted */
-                       PQfinish(st->con);
-                       st->con = NULL;
-                       return;
+                       fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id);
+                       return clientDone(st, false);
                }
-               gettimeofday(&t2, NULL);
-               diffTime(&t2, &t1, &t3);
-               addTime(&conn_total_time, &t3, &conn_total_time);
+               INSTR_TIME_SET_CURRENT(end);
+               INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
        }
 
        if (use_log && st->state == 0)
-               gettimeofday(&(st->txn_begin), NULL);
+               INSTR_TIME_SET_CURRENT(st->txn_begin);
 
        if (commands[st->state]->type == SQL_COMMAND)
        {
@@ -738,11 +735,11 @@ top:
                        {
                                fprintf(stderr, "out of memory\n");
                                st->ecnt++;
-                               return;
+                               return true;
                        }
 
                        if (debug)
-                               fprintf(stderr, "client %d sending %s\n", n, sql);
+                               fprintf(stderr, "client %d sending %s\n", st->id, sql);
                        r = PQsendQuery(st->con, sql);
                        free(sql);
                }
@@ -754,7 +751,7 @@ top:
                        getQueryParams(st, command, params);
 
                        if (debug)
-                               fprintf(stderr, "client %d sending %s\n", n, sql);
+                               fprintf(stderr, "client %d sending %s\n", st->id, sql);
                        r = PQsendQueryParams(st->con, sql, command->argc - 1,
                                                                  NULL, params, NULL, NULL, 0);
                }
@@ -788,7 +785,7 @@ top:
                        preparedStatementName(name, st->use_file, st->state);
 
                        if (debug)
-                               fprintf(stderr, "client %d sending %s\n", n, name);
+                               fprintf(stderr, "client %d sending %s\n", st->id, name);
                        r = PQsendQueryPrepared(st->con, name, command->argc - 1,
                                                                        params, NULL, NULL, 0);
                }
@@ -798,7 +795,7 @@ top:
                if (r == 0)
                {
                        if (debug)
-                               fprintf(stderr, "client %d cannot send %s\n", n, command->argv[0]);
+                               fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]);
                        st->ecnt++;
                }
                else
@@ -812,7 +809,7 @@ top:
 
                if (debug)
                {
-                       fprintf(stderr, "client %d executing \\%s", n, argv[0]);
+                       fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
                        for (i = 1; i < argc; i++)
                                fprintf(stderr, " %s", argv[i]);
                        fprintf(stderr, "\n");
@@ -831,7 +828,7 @@ top:
                                {
                                        fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
                                        st->ecnt++;
-                                       return;
+                                       return true;
                                }
                                min = atoi(var);
                        }
@@ -853,7 +850,7 @@ top:
                                {
                                        fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]);
                                        st->ecnt++;
-                                       return;
+                                       return true;
                                }
                                max = atoi(var);
                        }
@@ -864,7 +861,7 @@ top:
                        {
                                fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max);
                                st->ecnt++;
-                               return;
+                               return true;
                        }
 
 #ifdef DEBUG
@@ -876,7 +873,7 @@ top:
                        {
                                fprintf(stderr, "%s: out of memory\n", argv[0]);
                                st->ecnt++;
-                               return;
+                               return true;
                        }
 
                        st->listen = 1;
@@ -894,7 +891,7 @@ top:
                                {
                                        fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]);
                                        st->ecnt++;
-                                       return;
+                                       return true;
                                }
                                ope1 = atoi(var);
                        }
@@ -911,7 +908,7 @@ top:
                                        {
                                                fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]);
                                                st->ecnt++;
-                                               return;
+                                               return true;
                                        }
                                        ope2 = atoi(var);
                                }
@@ -930,7 +927,7 @@ top:
                                        {
                                                fprintf(stderr, "%s: division by zero\n", argv[0]);
                                                st->ecnt++;
-                                               return;
+                                               return true;
                                        }
                                        snprintf(res, sizeof(res), "%d", ope1 / ope2);
                                }
@@ -938,7 +935,7 @@ top:
                                {
                                        fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]);
                                        st->ecnt++;
-                                       return;
+                                       return true;
                                }
                        }
 
@@ -946,7 +943,7 @@ top:
                        {
                                fprintf(stderr, "%s: out of memory\n", argv[0]);
                                st->ecnt++;
-                               return;
+                               return true;
                        }
 
                        st->listen = 1;
@@ -955,7 +952,7 @@ top:
                {
                        char       *var;
                        int                     usec;
-                       struct timeval now;
+                       instr_time now;
 
                        if (*argv[1] == ':')
                        {
@@ -963,7 +960,7 @@ top:
                                {
                                        fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]);
                                        st->ecnt++;
-                                       return;
+                                       return true;
                                }
                                usec = atoi(var);
                        }
@@ -980,9 +977,8 @@ top:
                        else
                                usec *= 1000000;
 
-                       gettimeofday(&now, NULL);
-                       st->until.tv_sec = now.tv_sec + (now.tv_usec + usec) / 1000000;
-                       st->until.tv_usec = (now.tv_usec + usec) % 1000000;
+                       INSTR_TIME_SET_CURRENT(now);
+                       st->until = INSTR_TIME_GET_MICROSEC(now) + usec;
                        st->sleeping = 1;
 
                        st->listen = 1;
@@ -990,18 +986,23 @@ top:
 
                goto top;
        }
+
+       return true;
 }
 
 /* discard connections */
 static void
-disconnect_all(CState *state)
+disconnect_all(CState *state, int length)
 {
        int                     i;
 
-       for (i = 0; i < nclients; i++)
+       for (i = 0; i < length; i++)
        {
                if (state[i].con)
+               {
                        PQfinish(state[i].con);
+                       state[i].con = NULL;
+               }
        }
 }
 
@@ -1267,6 +1268,24 @@ process_commands(char *buf)
                                return NULL;
                        }
 
+                       /*
+                        * Split argument into number and unit for "sleep 1ms" or so.
+                        * We don't have to terminate the number argument with null
+                        * because it will parsed with atoi, that ignores trailing
+                        * non-digit characters.
+                        */
+                       if (my_commands->argv[1][0] != ':')
+                       {
+                               char    *c = my_commands->argv[1];
+                               while (isdigit(*c)) { c++; }
+                               if (*c)
+                               {
+                                       my_commands->argv[2] = c;
+                                       if (my_commands->argc < 3)
+                                               my_commands->argc = 3;
+                               }
+                       }
+
                        if (my_commands->argc >= 3)
                        {
                                if (pg_strcasecmp(my_commands->argv[2], "us") != 0 &&
@@ -1453,25 +1472,18 @@ process_builtin(char *tb)
 
 /* print out results */
 static void
-printResults(
-                        int ttype, CState *state,
-                        struct timeval * start_time, struct timeval * end_time)
+printResults(int ttype, int normal_xacts, int nclients, int nthreads,
+                        instr_time total_time, instr_time conn_total_time)
 {
-       double          t1,
-                               t2;
-       int                     i;
-       int                     normal_xacts = 0;
+       double          time_include,
+                               tps_include,
+                               tps_exclude;
        char       *s;
 
-       for (i = 0; i < nclients; i++)
-               normal_xacts += state[i].cnt;
-
-       t1 = (end_time->tv_sec - start_time->tv_sec) * 1000000.0 + (end_time->tv_usec - start_time->tv_usec);
-       t1 = normal_xacts * 1000000.0 / t1;
-
-       t2 = (end_time->tv_sec - start_time->tv_sec - conn_total_time.tv_sec) * 1000000.0 +
-               (end_time->tv_usec - start_time->tv_usec - conn_total_time.tv_usec);
-       t2 = normal_xacts * 1000000.0 / t2;
+       time_include = INSTR_TIME_GET_DOUBLE(total_time);
+       tps_include = normal_xacts / time_include;
+       tps_exclude = normal_xacts / (time_include -
+               (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads));
 
        if (ttype == 0)
                s = "TPC-B (sort of)";
@@ -1486,6 +1498,7 @@ printResults(
        printf("scaling factor: %d\n", scale);
        printf("query mode: %s\n", QUERYMODE[querymode]);
        printf("number of clients: %d\n", nclients);
+       printf("number of threads: %d\n", nthreads);
        if (duration <= 0)
        {
                printf("number of transactions per client: %d\n", nxacts);
@@ -1498,8 +1511,8 @@ printResults(
                printf("number of transactions actually processed: %d\n",
                           normal_xacts);
        }
-       printf("tps = %f (including connections establishing)\n", t1);
-       printf("tps = %f (excluding connections establishing)\n", t2);
+       printf("tps = %f (including connections establishing)\n", tps_include);
+       printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 }
 
 
@@ -1507,29 +1520,26 @@ int
 main(int argc, char **argv)
 {
        int                     c;
+       int                     nclients = 1;           /* default number of simulated clients */
+       int                     nthreads = 1;           /* default number of threads */
        int                     is_init_mode = 0;               /* initialize mode? */
        int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
        int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
-       int                     debug = 0;              /* debug flag */
        int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT only,
                                                                 * 2: skip update of branches and tellers */
        char       *filename = NULL;
        bool            scale_given = false;
 
        CState     *state;                      /* status of clients */
+       TState     *threads;            /* array of thread */
 
-       struct timeval start_time;      /* start up time */
-       struct timeval end_time;        /* end time */
+       instr_time      start_time;             /* start up time */
+       instr_time      total_time;
+       instr_time      conn_total_time;
+       int                     total_xacts;
 
        int                     i;
 
-       fd_set          input_mask;
-       int                     nsocks;                 /* return from select(2) */
-       int                     maxsock;                /* max socket number to be waited */
-       struct timeval now;
-       struct timeval timeout;
-       int                     min_usec;
-
 #ifdef HAVE_GETRLIMIT
        struct rlimit rlim;
 #endif
@@ -1579,7 +1589,7 @@ main(int argc, char **argv)
 
        memset(state, 0, sizeof(*state));
 
-       while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:")) != -1)
+       while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:")) != -1)
        {
                switch (c)
                {
@@ -1632,6 +1642,14 @@ main(int argc, char **argv)
                                }
 #endif   /* HAVE_GETRLIMIT */
                                break;
+                       case 'j':       /* jobs */
+                               nthreads = atoi(optarg);
+                               if (nthreads <= 0)
+                               {
+                                       fprintf(stderr, "invalid number of threads: %d\n", nthreads);
+                                       exit(1);
+                               }
+                               break;
                        case 'C':
                                is_connect = 1;
                                break;
@@ -1752,7 +1770,11 @@ main(int argc, char **argv)
        if (nxacts <= 0 && duration <= 0)
                nxacts = DEFAULT_NXACTS;
 
-       remains = nclients;
+       if (nclients % nthreads != 0)
+       {
+               fprintf(stderr, "number of clients (%d) must be a multiple number of threads (%d)\n", nclients, nthreads);
+               exit(1);
+       }
 
        if (nclients > 1)
        {
@@ -1770,6 +1792,7 @@ main(int argc, char **argv)
                {
                        int                     j;
 
+                       state[i].id = i;
                        for (j = 0; j < state[0].nvariables; j++)
                        {
                                if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false)
@@ -1879,33 +1902,8 @@ main(int argc, char **argv)
        PQfinish(con);
 
        /* set random seed */
-       gettimeofday(&start_time, NULL);
-       srandom((unsigned int) start_time.tv_usec);
-
-       /* get start up time */
-       gettimeofday(&start_time, NULL);
-
-       /* set alarm if duration is specified. */
-       if (duration > 0)
-               setalarm(duration);
-
-       if (is_connect == 0)
-       {
-               struct timeval t,
-                                       now;
-
-               /* make connections to the database */
-               for (i = 0; i < nclients; i++)
-               {
-                       state[i].id = i;
-                       if ((state[i].con = doConnect()) == NULL)
-                               exit(1);
-               }
-               /* time after connections set up */
-               gettimeofday(&now, NULL);
-               diffTime(&now, &start_time, &t);
-               addTime(&conn_total_time, &t, &conn_total_time);
-       }
+       INSTR_TIME_SET_CURRENT(start_time);
+       srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time));
 
        /* process bultin SQL scripts */
        switch (ttype)
@@ -1929,140 +1927,227 @@ main(int argc, char **argv)
                        break;
        }
 
+       /* get start up time */
+       INSTR_TIME_SET_CURRENT(start_time);
+
+       /* set alarm if duration is specified. */
+       if (duration > 0)
+               setalarm(duration);
+
+       /* start threads */
+       threads = (TState *) malloc(sizeof(TState) * nthreads);
+       for (i = 0; i < nthreads; i++)
+       {
+               threads[i].state = &state[nclients / nthreads * i];
+               threads[i].nstate = nclients / nthreads;
+               INSTR_TIME_SET_CURRENT(threads[i].start_time);
+
+               /* the first thread (i = 0) is executed by main thread */
+               if (i > 0)
+               {
+                       int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]);
+                       if (err != 0 || threads[i].thread == INVALID_THREAD)
+                       {
+                               fprintf(stderr, "cannot create thread: %s\n", strerror(err));
+                               exit(1);
+                       }
+               }
+               else
+               {
+                       threads[i].thread = INVALID_THREAD;
+               }
+       }
+
+       /* wait for threads and accumulate results */
+       total_xacts = 0;
+       INSTR_TIME_SET_ZERO(conn_total_time);
+       for (i = 0; i < nthreads; i++)
+       {
+               void *ret = NULL;
+
+               if (threads[i].thread == INVALID_THREAD)
+                       ret = threadRun(&threads[i]);
+               else
+                       pthread_join(threads[i].thread, &ret);
+
+               if (ret != NULL)
+               {
+                       TResult *r = (TResult *) ret;
+                       total_xacts += r->xacts;
+                       INSTR_TIME_ADD(conn_total_time, r->conn_time);
+                       free(ret);
+               }
+       }
+       disconnect_all(state, nclients);
+
+       /* get end time */
+       INSTR_TIME_SET_CURRENT(total_time);
+       INSTR_TIME_SUBTRACT(total_time, start_time);
+       printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time);
+       if (LOGFILE)
+               fclose(LOGFILE);
+
+       return 0;
+}
+
+static void *
+threadRun(void *arg)
+{
+       TState     *thread = (TState *) arg;
+       CState     *state = thread->state;
+       TResult    *result;
+       instr_time      start, end;
+       int                     nstate = thread->nstate;
+       int                     remains = nstate;       /* number of remaining clients */
+       int                     i;
+
+       result = malloc(sizeof(TResult));
+       INSTR_TIME_SET_ZERO(result->conn_time);
+
+       if (is_connect == 0)
+       {
+               /* make connections to the database */
+               for (i = 0; i < nstate; i++)
+               {
+                       if ((state[i].con = doConnect()) == NULL)
+                               goto done;
+               }
+       }
+
+       /* time after thread and connections set up */
+       INSTR_TIME_SET_CURRENT(result->conn_time);
+       INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+
        /* send start up queries in async manner */
-       for (i = 0; i < nclients; i++)
+       for (i = 0; i < nstate; i++)
        {
-               Command   **commands = sql_files[state[i].use_file];
-               int                     prev_ecnt = state[i].ecnt;
+               CState     *st = &state[i];
+               Command   **commands = sql_files[st->use_file];
+               int                     prev_ecnt = st->ecnt;
 
-               state[i].use_file = getrand(0, num_files - 1);
-               doCustom(state, i, debug);
+               st->use_file = getrand(0, num_files - 1);
+               if (!doCustom(st, &result->conn_time))
+                       remains--;              /* I've aborted */
 
-               if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+               if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
                {
-                       fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, state[i].state);
+                       fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state);
                        remains--;                      /* I've aborted */
-                       PQfinish(state[i].con);
-                       state[i].con = NULL;
+                       PQfinish(st->con);
+                       st->con = NULL;
                }
        }
 
-       for (;;)
+       while (remains > 0)
        {
-               if (remains <= 0)
-               {                                               /* all done ? */
-                       disconnect_all(state);
-                       /* get end time */
-                       gettimeofday(&end_time, NULL);
-                       printResults(ttype, state, &start_time, &end_time);
-                       if (LOGFILE)
-                               fclose(LOGFILE);
-                       exit(0);
-               }
+               fd_set                  input_mask;
+               int                             maxsock;                /* max socket number to be waited */
+               int64                   now_usec = 0;
+               int64                   min_usec;
 
                FD_ZERO(&input_mask);
 
                maxsock = -1;
-               min_usec = -1;
-               for (i = 0; i < nclients; i++)
+               min_usec = INT64_MAX;
+               for (i = 0; i < nstate; i++)
                {
-                       Command   **commands = sql_files[state[i].use_file];
+                       CState     *st = &state[i];
+                       Command   **commands = sql_files[st->use_file];
+                       int                     sock;
 
-                       if (state[i].sleeping)
+                       if (st->sleeping)
                        {
                                int                     this_usec;
-                               int                     sock = PQsocket(state[i].con);
 
-                               if (min_usec < 0)
+                               if (min_usec == INT64_MAX)
                                {
-                                       gettimeofday(&now, NULL);
-                                       min_usec = 0;
+                                       instr_time      now;
+                                       INSTR_TIME_SET_CURRENT(now);
+                                       now_usec = INSTR_TIME_GET_MICROSEC(now);
                                }
 
-                               this_usec = (state[i].until.tv_sec - now.tv_sec) * 1000000 +
-                                       state[i].until.tv_usec - now.tv_usec;
-
-                               if (this_usec > 0 && (min_usec == 0 || this_usec < min_usec))
+                               this_usec = st->until - now_usec;
+                               if (min_usec > this_usec)
                                        min_usec = this_usec;
-
-                               FD_SET          (sock, &input_mask);
-
-                               if (maxsock < sock)
-                                       maxsock = sock;
                        }
-                       else if (state[i].con && commands[state[i].state]->type != META_COMMAND)
+                       else if (st->con == NULL)
                        {
-                               int                     sock = PQsocket(state[i].con);
-
-                               if (sock < 0)
-                               {
-                                       disconnect_all(state);
-                                       exit(1);
-                               }
-                               FD_SET          (sock, &input_mask);
+                               continue;
+                       }
+                       else if (commands[st->state]->type == META_COMMAND)
+                       {
+                               min_usec = 0;   /* the connection is ready to run */
+                               break;
+                       }
 
-                               if (maxsock < sock)
-                                       maxsock = sock;
+                       sock = PQsocket(st->con);
+                       if (sock < 0)
+                       {
+                               fprintf(stderr, "bad socket: %s\n", strerror(errno));
+                               goto done;
                        }
+
+                       FD_SET(sock, &input_mask);
+                       if (maxsock < sock)
+                               maxsock = sock;
                }
 
-               if (maxsock != -1)
+               if (min_usec > 0 && maxsock != -1)
                {
-                       if (min_usec >= 0)
+                       int             nsocks;                 /* return from select(2) */
+
+                       if (min_usec != INT64_MAX)
                        {
+                               struct timeval  timeout;
                                timeout.tv_sec = min_usec / 1000000;
                                timeout.tv_usec = min_usec % 1000000;
-
-                               nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
-                                                               (fd_set *) NULL, &timeout);
+                               nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
                        }
                        else
-                               nsocks = select(maxsock + 1, &input_mask, (fd_set *) NULL,
-                                                               (fd_set *) NULL, (struct timeval *) NULL);
+                               nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
                        if (nsocks < 0)
                        {
                                if (errno == EINTR)
                                        continue;
                                /* must be something wrong */
-                               disconnect_all(state);
                                fprintf(stderr, "select failed: %s\n", strerror(errno));
-                               exit(1);
-                       }
-#ifdef NOT_USED
-                       else if (nsocks == 0)
-                       {                                       /* timeout */
-                               fprintf(stderr, "select timeout\n");
-                               for (i = 0; i < nclients; i++)
-                               {
-                                       fprintf(stderr, "client %d:state %d cnt %d ecnt %d listen %d\n",
-                                                       i, state[i].state, state[i].cnt, state[i].ecnt, state[i].listen);
-                               }
-                               exit(0);
+                               goto done;
                        }
-#endif
                }
 
                /* ok, backend returns reply */
-               for (i = 0; i < nclients; i++)
+               for (i = 0; i < nstate; i++)
                {
-                       Command   **commands = sql_files[state[i].use_file];
-                       int                     prev_ecnt = state[i].ecnt;
+                       CState     *st = &state[i];
+                       Command   **commands = sql_files[st->use_file];
+                       int                     prev_ecnt = st->ecnt;
 
-                       if (state[i].con && (FD_ISSET(PQsocket(state[i].con), &input_mask)
-                                                 || commands[state[i].state]->type == META_COMMAND))
+                       if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
+                                                 || commands[st->state]->type == META_COMMAND))
                        {
-                               doCustom(state, i, debug);
+                               if (!doCustom(st, &result->conn_time))
+                                       remains--;              /* I've aborted */
                        }
 
-                       if (state[i].ecnt > prev_ecnt && commands[state[i].state]->type == META_COMMAND)
+                       if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
                        {
-                               fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, state[i].state);
+                               fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state);
                                remains--;              /* I've aborted */
-                               PQfinish(state[i].con);
-                               state[i].con = NULL;
+                               PQfinish(st->con);
+                               st->con = NULL;
                        }
                }
        }
+
+done:
+       INSTR_TIME_SET_CURRENT(start);
+       disconnect_all(state, nstate);
+       result->xacts = 0;
+       for (i = 0; i < nstate; i++)
+               result->xacts += state[i].cnt;
+       INSTR_TIME_SET_CURRENT(end);
+       INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+       return result;
 }
 
 
@@ -2084,6 +2169,87 @@ setalarm(int seconds)
        pqsignal(SIGALRM, handle_sig_alarm);
        alarm(seconds);
 }
+
+#ifndef ENABLE_THREAD_SAFETY
+
+/*
+ * implements pthread using fork.
+ */
+
+typedef struct fork_pthread
+{
+       pid_t   pid;
+       int             pipes[2];
+} fork_pthread;
+
+static int
+pthread_create(pthread_t *thread,
+                          pthread_attr_t *attr,
+                          void * (*start_routine)(void *),
+                          void *arg)
+{
+       fork_pthread   *th;
+       void               *ret;
+
+       th = (fork_pthread *) malloc(sizeof(fork_pthread));
+       pipe(th->pipes);
+
+       th->pid = fork();
+       if (th->pid == -1)      /* error */
+       {
+               free(th);
+               return errno;
+       }
+       if (th->pid != 0)       /* parent process */
+       {
+               close(th->pipes[1]);
+               *thread = th;
+               return 0;
+       }
+
+       /* child process */
+       close(th->pipes[0]);
+
+       /* set alarm again because the child does not inherit timers */
+       if (duration > 0)
+               setalarm(duration);
+
+       ret = start_routine(arg);
+       write(th->pipes[1], ret, sizeof(TResult));
+       close(th->pipes[1]);
+       free(th);
+       exit(0);
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+       int             status;
+
+       while (waitpid(th->pid, &status, 0) != th->pid)
+       {
+               if (errno != EINTR)
+                       return errno;
+       }
+
+       if (thread_return != NULL)
+       {
+               /* assume result is TResult */
+               *thread_return = malloc(sizeof(TResult));
+               if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
+               {
+                       free(*thread_return);
+                       *thread_return = NULL;
+               }
+       }
+       close(th->pipes[0]);
+
+       free(th);
+       return 0;
+}
+
+#endif
+
 #else                                                  /* WIN32 */
 
 static VOID CALLBACK
@@ -2110,4 +2276,70 @@ setalarm(int seconds)
        }
 }
 
+/* partial pthread implementation for Windows */
+
+typedef struct win32_pthread
+{
+       HANDLE          handle;
+       void       *(*routine)(void *);
+       void       *arg;
+       void       *result;
+} win32_pthread;
+
+static unsigned __stdcall
+win32_pthread_run(void *arg)
+{
+       win32_pthread *th = (win32_pthread *) arg;
+
+       th->result = th->routine(th->arg);
+
+       return 0;
+}
+
+static int
+pthread_create(pthread_t *thread,
+                          pthread_attr_t *attr,
+                          void * (*start_routine)(void *),
+                          void *arg)
+{
+       int                             save_errno;
+       win32_pthread   *th;
+
+       th = (win32_pthread *) malloc(sizeof(win32_pthread));
+       th->routine = start_routine;
+       th->arg = arg;
+       th->result = NULL;
+
+       th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
+       if (th->handle == NULL)
+       {
+               save_errno = errno;
+               free(th);
+               return save_errno;
+       }
+
+       *thread = th;
+       return 0;
+}
+
+static int
+pthread_join(pthread_t th, void **thread_return)
+{
+       if (th == NULL || th->handle == NULL)
+               return errno = EINVAL;
+
+       if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
+       {
+               _dosmaperr(GetLastError());
+               return errno;
+       }
+
+       if (thread_return)
+               *thread_return = th->result;
+
+       CloseHandle(th->handle);
+       free(th);
+       return 0;
+}
+
 #endif   /* WIN32 */
index 5c30e8499f5f80277eb095356ad23a98e7fd0368..c34f7acbbb9c644a32cb1011b11b5478e8da21f0 100644 (file)
@@ -1,4 +1,4 @@
-<!-- $PostgreSQL: pgsql/doc/src/sgml/pgbench.sgml,v 1.8 2009/05/07 22:01:18 tgl Exp $ -->
+<!-- $PostgreSQL: pgsql/doc/src/sgml/pgbench.sgml,v 1.9 2009/08/03 15:18:14 ishii Exp $ -->
 
 <sect1 id="pgbench">
  <title>pgbench</title>
@@ -171,6 +171,14 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
        sessions.  Default is 1.
       </entry>
      </row>
+     <row>
+      <entry><literal>-j</literal> <replaceable>threads</></entry>
+      <entry>
+       Number of worker threads. Clients are equally-divided into those
+       threads and executed in it. The number of clients must be a multiple
+       number of threads. Default is 1.
+      </entry>
+     </row>
      <row>
       <entry><literal>-t</literal> <replaceable>transactions</></entry>
       <entry>