]> granicus.if.org Git - postgresql/blobdiff - contrib/pgbench/pgbench.c
Avoid PQisBusy/PQconsumeInput busy loop in case of PQisBusy returning
[postgresql] / contrib / pgbench / pgbench.c
index c8aa0c14b724a0a86ca6f1e789c734e204360950..2eadf6254194493378cbb169c3111ebbcbf400d7 100644 (file)
@@ -1,10 +1,10 @@
 /*
- * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.12 2001/10/25 05:49:19 momjian Exp $
+ * $Header: /cvsroot/pgsql/contrib/pgbench/pgbench.c,v 1.20 2002/10/07 05:10:02 ishii Exp $
  *
  * pgbench: a simple TPC-B like benchmark program for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2000  Tatsuo Ishii
+ * Copyright (c) 2000-2002     Tatsuo Ishii
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -39,7 +39,7 @@
 
 /* for getrlimit */
 #include <sys/resource.h>
-#endif  /* WIN32 */
+#endif   /* ! WIN32 */
 
 /********************************************************************
  * some configurable parameters */
@@ -64,10 +64,14 @@ int                 tps = 1;
 #define ntellers       10
 #define naccounts      100000
 
-int                    remains;                        /* number of remained clients */
+FILE      *LOGFILE = NULL;
+
+bool           use_log;                        /* log transaction latencies to a file */
+
+int                    remains;                        /* number of remaining clients */
 
 int                    is_connect;                     /* establish connection  for each
-                                                                * transactoin */
+                                                                * transaction */
 
 char      *pghost = "";
 char      *pgport = NULL;
@@ -80,22 +84,24 @@ char           *dbName;
 typedef struct
 {
        PGconn     *con;                        /* connection handle to DB */
+       int                     id;                             /* client No. */
        int                     state;                  /* state No. */
        int                     cnt;                    /* xacts count */
        int                     ecnt;                   /* error count */
-       int                     listen;                 /* none 0 indicates that an async query
-                                                                * has been sent */
+       int                     listen;                 /* 0 indicates that an async query has
+                                                                * been sent */
        int                     aid;                    /* account id for this transaction */
        int                     bid;                    /* branch id for this transaction */
        int                     tid;                    /* teller id for this transaction */
        int                     delta;
        int                     abalance;
-}                      CState;
+       struct timeval txn_begin;       /* used for measuring latencies */
+}      CState;
 
 static void
 usage()
 {
-       fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-U login][-P password][-d][dbname]\n");
+       fprintf(stderr, "usage: pgbench [-h hostname][-p port][-c nclients][-t ntransactions][-s scaling_factor][-n][-C][-v][-S][-N][-l][-U login][-P password][-d][dbname]\n");
        fprintf(stderr, "(initialize mode): pgbench -i [-h hostname][-p port][-s scaling_factor][-U login][-P password][-d][dbname]\n");
 }
 
@@ -168,7 +174,7 @@ check(CState * state, PGresult *res, int n, int good)
 
 /* process a transaction */
 static void
-doOne(CState * state, int n, int debug)
+doOne(CState * state, int n, int debug, int ttype)
 {
        char            sql[256];
        PGresult   *res;
@@ -178,17 +184,16 @@ doOne(CState * state, int n, int debug)
        {                                                       /* are we receiver? */
                if (debug)
                        fprintf(stderr, "client %d receiving\n", n);
-               while (PQisBusy(st->con) == TRUE)
-               {
-                       if (!PQconsumeInput(st->con))
-                       {                                       /* there's something wrong */
-                               fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
-                               remains--;              /* I've aborted */
-                               PQfinish(st->con);
-                               st->con = NULL;
-                               return;
-                       }
+               if (!PQconsumeInput(st->con))
+               {                                               /* there's something wrong */
+                       fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
+                       remains--;                      /* I've aborted */
+                       PQfinish(st->con);
+                       st->con = NULL;
+                       return;
                }
+               if (PQisBusy(st->con))
+                       return;                         /* don't have the whole result yet */
 
                switch (st->state)
                {
@@ -235,6 +240,23 @@ doOne(CState * state, int n, int debug)
                                discard_response(st);
                                break;
                        case 6:                         /* response to "end" */
+
+                               /*
+                                * transaction finished: record the time it took in the
+                                * log
+                                */
+                               if (use_log)
+                               {
+                                       long long       diff;
+                                       struct timeval now;
+
+                                       gettimeofday(&now, 0);
+                                       diff = (now.tv_sec - st->txn_begin.tv_sec) * 1000000 +
+                                               (now.tv_usec - st->txn_begin.tv_usec);
+
+                                       fprintf(LOGFILE, "%d %d %lld\n", st->id, st->cnt, diff);
+                               }
+
                                res = PQgetResult(st->con);
                                if (check(state, res, n, PGRES_COMMAND_OK))
                                        return;
@@ -249,7 +271,7 @@ doOne(CState * state, int n, int debug)
 
                                if (++st->cnt >= nxacts)
                                {
-                                       remains--;      /* I've done */
+                                       remains--;      /* I'm done */
                                        if (st->con != NULL)
                                        {
                                                PQfinish(st->con);
@@ -287,23 +309,31 @@ doOne(CState * state, int n, int debug)
                        st->bid = getrand(1, nbranches * tps);
                        st->tid = getrand(1, ntellers * tps);
                        st->delta = getrand(1, 1000);
+                       if (use_log)
+                               gettimeofday(&(st->txn_begin), 0);
                        break;
                case 1:
-                       sprintf(sql, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
+                       snprintf(sql, 256, "update accounts set abalance = abalance + %d where aid = %d\n", st->delta, st->aid);
                        break;
                case 2:
-                       sprintf(sql, "select abalance from accounts where aid = %d", st->aid);
+                       snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
                        break;
                case 3:
-                       sprintf(sql, "update tellers set tbalance = tbalance + %d where tid = %d\n",
-                                       st->delta, st->tid);
-                       break;
+                       if (ttype == 0)
+                       {
+                               snprintf(sql, 256, "update tellers set tbalance = tbalance + %d where tid = %d\n",
+                                                st->delta, st->tid);
+                               break;
+                       }
                case 4:
-                       sprintf(sql, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
-                       break;
+                       if (ttype == 0)
+                       {
+                               snprintf(sql, 256, "update branches set bbalance = bbalance + %d where bid = %d", st->delta, st->bid);
+                               break;
+                       }
                case 5:
-                       sprintf(sql, "insert into history(tid,bid,aid,delta,mtime) values(%d,%d,%d,%d,'now')",
-                                       st->tid, st->bid, st->aid, st->delta);
+                       snprintf(sql, 256, "insert into history(tid,bid,aid,delta,mtime) values(%d,%d,%d,%d,'now')",
+                                        st->tid, st->bid, st->aid, st->delta);
                        break;
                case 6:
                        strcpy(sql, "end");
@@ -320,7 +350,7 @@ doOne(CState * state, int n, int debug)
        }
        else
        {
-               st->listen++;                   /* flags that should be listned */
+               st->listen++;                   /* flags that should be listened */
        }
 }
 
@@ -336,17 +366,16 @@ doSelectOnly(CState * state, int n, int debug)
        {                                                       /* are we receiver? */
                if (debug)
                        fprintf(stderr, "client %d receiving\n", n);
-               while (PQisBusy(st->con) == TRUE)
-               {
-                       if (!PQconsumeInput(st->con))
-                       {                                       /* there's something wrong */
-                               fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
-                               remains--;              /* I've aborted */
-                               PQfinish(st->con);
-                               st->con = NULL;
-                               return;
-                       }
+               if (!PQconsumeInput(st->con))
+               {                                               /* there's something wrong */
+                       fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", n, st->state);
+                       remains--;                      /* I've aborted */
+                       PQfinish(st->con);
+                       st->con = NULL;
+                       return;
                }
+               if (PQisBusy(st->con))
+                       return;                         /* don't have the whole result yet */
 
                switch (st->state)
                {
@@ -399,7 +428,7 @@ doSelectOnly(CState * state, int n, int debug)
        {
                case 0:
                        st->aid = getrand(1, naccounts * tps);
-                       sprintf(sql, "select abalance from accounts where aid = %d", st->aid);
+                       snprintf(sql, 256, "select abalance from accounts where aid = %d", st->aid);
                        break;
        }
 
@@ -414,7 +443,7 @@ doSelectOnly(CState * state, int n, int debug)
        }
        else
        {
-               st->listen++;                   /* flags that should be listned */
+               st->listen++;                   /* flags that should be listened */
        }
 }
 
@@ -433,7 +462,7 @@ disconnect_all(CState * state)
 
 /* create tables and setup data */
 static void
-init()
+init(void)
 {
        PGconn     *con;
        PGresult   *res;
@@ -473,7 +502,7 @@ init()
 
        for (i = 0; i < nbranches * tps; i++)
        {
-               sprintf(sql, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
+               snprintf(sql, 256, "insert into branches(bid,bbalance) values(%d,0)", i + 1);
                res = PQexec(con, sql);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
@@ -485,8 +514,8 @@ init()
 
        for (i = 0; i < ntellers * tps; i++)
        {
-               sprintf(sql, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
-                               ,i + 1, i / ntellers + 1);
+               snprintf(sql, 256, "insert into tellers(tid,bid,tbalance) values (%d,%d,0)"
+                                ,i + 1, i / ntellers + 1);
                res = PQexec(con, sql);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
@@ -523,7 +552,7 @@ init()
                        PQclear(res);
                }
 
-               sprintf(sql, "%d\t%d\t%d\t\n", j, j / naccounts, 0);
+               snprintf(sql, 256, "%d\t%d\t%d\t\n", j, j / naccounts, 0);
                if (PQputline(con, sql))
                {
                        fprintf(stderr, "PQputline failed\n");
@@ -549,6 +578,8 @@ init()
                                exit(1);
                        }
 
+#ifdef NOT_USED
+
                        /*
                         * do a checkpoint to purge the old WAL logs
                         */
@@ -558,6 +589,7 @@ init()
                                fprintf(stderr, "%s", PQerrorMessage(con));
                                exit(1);
                        }
+#endif   /* NOT_USED */
                }
        }
 
@@ -585,6 +617,7 @@ printResults(
                                t2;
        int                     i;
        int                     normal_xacts = 0;
+       char       *s;
 
        for (i = 0; i < nclients; i++)
                normal_xacts += state[i].cnt;
@@ -595,13 +628,20 @@ printResults(
        t2 = (tv3->tv_sec - tv2->tv_sec) * 1000000.0 + (tv3->tv_usec - tv2->tv_usec);
        t2 = normal_xacts * 1000000.0 / t2;
 
-       printf("transaction type: %s\n", ttype == 0 ? "TPC-B (sort of)" : "SELECT only");
+       if (ttype == 0)
+               s = "TPC-B (sort of)";
+       else if (ttype == 2)
+               s = "Update only accounts";
+       else
+               s = "SELECT only";
+
+       printf("transaction type: %s\n", s);
        printf("scaling factor: %d\n", tps);
        printf("number of clients: %d\n", nclients);
        printf("number of transactions per client: %d\n", nxacts);
        printf("number of transactions actually processed: %d/%d\n", normal_xacts, nxacts * nclients);
-       printf("tps = %f(including connections establishing)\n", t1);
-       printf("tps = %f(excluding connections establishing)\n", t2);
+       printf("tps = %f (including connections establishing)\n", t1);
+       printf("tps = %f (excluding connections establishing)\n", t2);
 }
 
 
@@ -619,9 +659,10 @@ main(int argc, char **argv)
        int                     is_full_vacuum = 0;             /* do full vacuum before testing? */
        int                     debug = 0;              /* debug flag */
        int                     ttype = 0;              /* transaction type. 0: TPC-B, 1: SELECT
-                                                                * only */
+                                                                * only, 2: skip update of branches and
+                                                                * tellers */
 
-       static CState state[MAXCLIENTS];        /* clients status */
+       static CState *state;           /* status of clients */
 
        struct timeval tv1;                     /* start up time */
        struct timeval tv2;                     /* after establishing all connections to
@@ -641,7 +682,7 @@ main(int argc, char **argv)
        PGconn     *con;
        PGresult   *res;
 
-       while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CS")) != EOF)
+       while ((c = getopt(argc, argv, "ih:nvp:dc:t:s:U:P:CNSl")) != -1)
        {
                switch (c)
                {
@@ -666,11 +707,14 @@ main(int argc, char **argv)
                        case 'S':
                                ttype = 1;
                                break;
+                       case 'N':
+                               ttype = 2;
+                               break;
                        case 'c':
                                nclients = atoi(optarg);
                                if (nclients <= 0 || nclients > MAXCLIENTS)
                                {
-                                       fprintf(stderr, "wrong number of clients: %d\n", nclients);
+                                       fprintf(stderr, "invalid number of clients: %d\n", nclients);
                                        exit(1);
                                }
 #ifndef __CYGWIN__
@@ -680,7 +724,7 @@ main(int argc, char **argv)
 #else                                                  /* but BSD doesn't ... */
                                if (getrlimit(RLIMIT_OFILE, &rlim) == -1)
                                {
-#endif  /* HAVE_RLIMIT_NOFILE */
+#endif   /* HAVE_RLIMIT_NOFILE */
                                        fprintf(stderr, "getrlimit failed. reason: %s\n", strerror(errno));
                                        exit(1);
                                }
@@ -690,7 +734,7 @@ main(int argc, char **argv)
                                        fprintf(stderr, "Use limit/ulimt to increase the limit before using pgbench.\n");
                                        exit(1);
                                }
-#endif  /* #ifndef __CYGWIN__ */
+#endif   /* #ifndef __CYGWIN__ */
                                break;
                        case 'C':
                                is_connect = 1;
@@ -699,7 +743,7 @@ main(int argc, char **argv)
                                tps = atoi(optarg);
                                if (tps <= 0)
                                {
-                                       fprintf(stderr, "wrong scaling factor: %d\n", tps);
+                                       fprintf(stderr, "invalid scaling factor: %d\n", tps);
                                        exit(1);
                                }
                                break;
@@ -707,7 +751,7 @@ main(int argc, char **argv)
                                nxacts = atoi(optarg);
                                if (nxacts <= 0)
                                {
-                                       fprintf(stderr, "wrong number of transactions: %d\n", nxacts);
+                                       fprintf(stderr, "invalid number of transactions: %d\n", nxacts);
                                        exit(1);
                                }
                                break;
@@ -717,6 +761,9 @@ main(int argc, char **argv)
                        case 'P':
                                pwd = optarg;
                                break;
+                       case 'l':
+                               use_log = true;
+                               break;
                        default:
                                usage();
                                exit(1);
@@ -741,6 +788,23 @@ main(int argc, char **argv)
 
        remains = nclients;
 
+       state = (CState *) malloc(sizeof(*state) * nclients);
+       memset(state, 0, sizeof(*state));
+
+       if (use_log)
+       {
+               char            logpath[64];
+
+               snprintf(logpath, 64, "pgbench_log.%d", getpid());
+               LOGFILE = fopen(logpath, "w");
+
+               if (LOGFILE == NULL)
+               {
+                       fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+                       exit(1);
+               }
+       }
+
        if (debug)
        {
                printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n",
@@ -840,6 +904,7 @@ main(int argc, char **argv)
                /* make connections to the database */
                for (i = 0; i < nclients; i++)
                {
+                       state[i].id = i;
                        if ((state[i].con = doConnect()) == NULL)
                                exit(1);
                }
@@ -848,11 +913,11 @@ main(int argc, char **argv)
        /* time after connections set up */
        gettimeofday(&tv2, 0);
 
-       /* send start up quries in async manner */
+       /* send start up queries in async manner */
        for (i = 0; i < nclients; i++)
        {
-               if (ttype == 0)
-                       doOne(state, i, debug);
+               if (ttype == 0 || ttype == 2)
+                       doOne(state, i, debug, ttype);
                else if (ttype == 1)
                        doSelectOnly(state, i, debug);
        }
@@ -865,6 +930,8 @@ main(int argc, char **argv)
                        /* get end time */
                        gettimeofday(&tv3, 0);
                        printResults(ttype, state, &tv1, &tv2, &tv3);
+                       if (LOGFILE)
+                               fclose(LOGFILE);
                        exit(0);
                }
 
@@ -879,7 +946,7 @@ main(int argc, char **argv)
 
                                if (sock < 0)
                                {
-                                       fprintf(stderr, "Client %d: PQsock failed\n", i);
+                                       fprintf(stderr, "Client %d: PQsocket failed\n", i);
                                        disconnect_all(state);
                                        exit(1);
                                }
@@ -915,8 +982,8 @@ main(int argc, char **argv)
                {
                        if (state[i].con && FD_ISSET(PQsocket(state[i].con), &input_mask))
                        {
-                               if (ttype == 0)
-                                       doOne(state, i, debug);
+                               if (ttype == 0 || ttype == 2)
+                                       doOne(state, i, debug, ttype);
                                else if (ttype == 1)
                                        doSelectOnly(state, i, debug);
                        }