]> granicus.if.org Git - postgresql/commitdiff
pgbench: refactor handling of stats tracking
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 29 Jan 2016 12:05:08 +0000 (13:05 +0100)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 29 Jan 2016 12:05:08 +0000 (13:05 +0100)
This doesn't add any functionality but just shuffles things around so
that it can be reused and improved later.

Author: Fabien Coelho
Reviewed-by: Michael Paquier, Álvaro Herrera
src/bin/pgbench/pgbench.c

index d5f242c23fe64d9b26af32cbec8801b19f26f3e2..44da3d19c19c36c0636c304cc56d70c5d3e5332f 100644 (file)
@@ -166,10 +166,8 @@ int                        agg_interval;           /* log aggregates instead of individual
                                                                 * transactions */
 int                    progress = 0;           /* thread progress report every this seconds */
 bool           progress_timestamp = false; /* progress report with Unix time */
-int                    progress_nclients = 0;          /* number of clients for progress
-                                                                                * report */
-int                    progress_nthreads = 0;          /* number of threads for progress
-                                                                                * report */
+int                    nclients = 1;           /* number of clients */
+int                    nthreads = 1;           /* number of threads */
 bool           is_connect;                     /* establish connection for each transaction */
 bool           is_latencies;           /* report per-command latencies */
 int                    main_pid;                       /* main process id used in log filename */
@@ -192,6 +190,35 @@ typedef struct
 #define MAX_SCRIPTS            128             /* max number of SQL scripts allowed */
 #define SHELL_COMMAND_SIZE     256 /* maximum size allowed for shell command */
 
+/*
+ * Simple data structure to keep stats about something.
+ *
+ * XXX probably the first value should be kept and used as an offset for
+ * better numerical stability...
+ */
+typedef struct SimpleStats
+{
+       int64           count;                  /* how many values were encountered */
+       double          min;                    /* the minimum seen */
+       double          max;                    /* the maximum seen */
+       double          sum;                    /* sum of values */
+       double          sum2;                   /* sum of squared values */
+} SimpleStats;
+
+/*
+ * Data structure to hold various statistics: per-thread stats are maintained
+ * and merged together.
+ */
+typedef struct StatsData
+{
+       long            start_time;             /* interval start time, for aggregates */
+       int64           cnt;                    /* number of transactions */
+       int64           skipped;                /* number of transactions skipped under --rate
+                                                                * and --latency-limit */
+       SimpleStats latency;
+       SimpleStats lag;
+} StatsData;
+
 /*
  * Connection state
  */
@@ -213,10 +240,8 @@ typedef struct
        bool            prepared[MAX_SCRIPTS];  /* whether client prepared the script */
 
        /* per client collected stats */
-       int                     cnt;                    /* xacts count */
+       int64           cnt;                    /* transaction count */
        int                     ecnt;                   /* error count */
-       int64           txn_latencies;  /* cumulated latencies */
-       int64           txn_sqlats;             /* cumulated square latencies */
 } CState;
 
 /*
@@ -228,19 +253,14 @@ typedef struct
        pthread_t       thread;                 /* thread handle */
        CState     *state;                      /* array of CState */
        int                     nstate;                 /* length of state[] */
-       instr_time      start_time;             /* thread start time */
-       instr_time *exec_elapsed;       /* time spent executing cmds (per Command) */
-       int                *exec_count;         /* number of cmd executions (per Command) */
        unsigned short random_state[3];         /* separate randomness for each thread */
        int64           throttle_trigger;               /* previous/next throttling (us) */
 
        /* per thread collected stats */
+       instr_time      start_time;             /* thread start time */
        instr_time      conn_time;
-       int64           throttle_lag;   /* total transaction lag behind throttling */
-       int64           throttle_lag_max;               /* max transaction lag */
-       int64           throttle_latency_skipped;               /* lagging transactions
-                                                                                                * skipped */
-       int64           latency_late;   /* late transactions */
+       StatsData       stats;
+       int64           latency_late;   /* executed but late transactions */
 } TState;
 
 #define INVALID_THREAD         ((pthread_t) 0)
@@ -272,33 +292,14 @@ typedef struct
        char       *argv[MAX_ARGS]; /* command word list */
        int                     cols[MAX_ARGS]; /* corresponding column starting from 1 */
        PgBenchExpr *expr;                      /* parsed expression */
+       SimpleStats stats;                      /* time spent in this command */
 } Command;
 
-typedef struct
-{
-
-       long            start_time;             /* when does the interval start */
-       int                     cnt;                    /* number of transactions */
-       int                     skipped;                /* number of transactions skipped under --rate
-                                                                * and --latency-limit */
-
-       double          min_latency;    /* min/max latencies */
-       double          max_latency;
-       double          sum_latency;    /* sum(latency), sum(latency^2) - for
-                                                                * estimates */
-       double          sum2_latency;
-
-       double          min_lag;
-       double          max_lag;
-       double          sum_lag;                /* sum(lag) */
-       double          sum2_lag;               /* sum(lag*lag) */
-} AggVals;
-
 static struct
 {
        const char *name;
-       Command  **commands;
-} sql_script[MAX_SCRIPTS];             /* SQL script files */
+       Command   **commands;
+}      sql_script[MAX_SCRIPTS];        /* SQL script files */
 static int     num_scripts;            /* number of scripts in sql_script[] */
 static int     num_commands = 0;       /* total number of Command structs */
 static int     debug = 0;                      /* debug flag */
@@ -362,8 +363,11 @@ static struct
 static void setalarm(int seconds);
 static void *threadRun(void *arg);
 
+static void processXactStats(TState *thread, CState *st, instr_time *now,
+                                bool skipped, FILE *logfile, StatsData *agg);
 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
-         AggVals *agg, bool skipped);
+         StatsData *agg, bool skipped, double latency, double lag);
+
 
 static void
 usage(void)
@@ -602,6 +606,82 @@ getPoissonRand(TState *thread, int64 center)
        return (int64) (-log(uniform) * ((double) center) + 0.5);
 }
 
+/*
+ * Initialize the given SimpleStats struct to all zeroes
+ */
+static void
+initSimpleStats(SimpleStats *ss)
+{
+       memset(ss, 0, sizeof(SimpleStats));
+}
+
+/*
+ * Accumulate one value into a SimpleStats struct.
+ */
+static void
+addToSimpleStats(SimpleStats *ss, double val)
+{
+       if (ss->count == 0 || val < ss->min)
+               ss->min = val;
+       if (ss->count == 0 || val > ss->max)
+               ss->max = val;
+       ss->count++;
+       ss->sum += val;
+       ss->sum2 += val * val;
+}
+
+/*
+ * Merge two SimpleStats objects
+ */
+static void
+mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
+{
+       if (acc->count == 0 || ss->min < acc->min)
+               acc->min = ss->min;
+       if (acc->count == 0 || ss->max > acc->max)
+               acc->max = ss->max;
+       acc->count += ss->count;
+       acc->sum += ss->sum;
+       acc->sum2 += ss->sum2;
+}
+
+/*
+ * Initialize a StatsData struct to mostly zeroes, with its start time set to
+ * the given value.
+ */
+static void
+initStats(StatsData *sd, double start_time)
+{
+       sd->start_time = start_time;
+       sd->cnt = 0;
+       sd->skipped = 0;
+       initSimpleStats(&sd->latency);
+       initSimpleStats(&sd->lag);
+}
+
+/*
+ * Accumulate one additional item into the given stats object.
+ */
+static void
+accumStats(StatsData *stats, bool skipped, double lat, double lag)
+{
+       stats->cnt++;
+
+       if (skipped)
+       {
+               /* no latency to record on skipped transactions */
+               stats->skipped++;
+       }
+       else
+       {
+               addToSimpleStats(&stats->latency, lat);
+
+               /* and possibly the same for schedule lag */
+               if (throttle_delay)
+                       addToSimpleStats(&stats->lag, lag);
+       }
+}
+
 /* call PQexec() and exit() on failure */
 static void
 executeStatement(PGconn *con, const char *sql)
@@ -1121,30 +1201,6 @@ clientDone(CState *st, bool ok)
        return false;                           /* always false */
 }
 
-static void
-agg_vals_init(AggVals *aggs, instr_time start)
-{
-       /* basic counters */
-       aggs->cnt = 0;                          /* number of transactions (includes skipped) */
-       aggs->skipped = 0;                      /* xacts skipped under --rate --latency-limit */
-
-       aggs->sum_latency = 0;          /* SUM(latency) */
-       aggs->sum2_latency = 0;         /* SUM(latency*latency) */
-
-       /* min and max transaction duration */
-       aggs->min_latency = 0;
-       aggs->max_latency = 0;
-
-       /* schedule lag counters */
-       aggs->sum_lag = 0;
-       aggs->sum2_lag = 0;
-       aggs->min_lag = 0;
-       aggs->max_lag = 0;
-
-       /* start of the current interval */
-       aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
-}
-
 static int
 chooseScript(TState *thread)
 {
@@ -1156,7 +1212,7 @@ chooseScript(TState *thread)
 
 /* return false iff client should be disconnected */
 static bool
-doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
+doCustom(TState *thread, CState *st, FILE *logfile, StatsData *agg)
 {
        PGresult   *res;
        Command   **commands;
@@ -1210,11 +1266,8 @@ top:
                        now_us = INSTR_TIME_GET_MICROSEC(now);
                        while (thread->throttle_trigger < now_us - latency_limit)
                        {
-                               thread->throttle_latency_skipped++;
-
-                               if (logfile)
-                                       doLog(thread, st, logfile, &now, agg, true);
-
+                               processXactStats(thread, st, &now, true, logfile, agg);
+                               /* next rendez-vous */
                                wait = getPoissonRand(thread, throttle_delay);
                                thread->throttle_trigger += wait;
                                st->txn_scheduled = thread->throttle_trigger;
@@ -1231,28 +1284,13 @@ top:
 
        if (st->sleeping)
        {                                                       /* are we sleeping? */
-               int64           now_us;
-
                if (INSTR_TIME_IS_ZERO(now))
                        INSTR_TIME_SET_CURRENT(now);
-               now_us = INSTR_TIME_GET_MICROSEC(now);
-               if (st->txn_scheduled <= now_us)
-               {
-                       /* Done sleeping, go ahead with next command */
-                       st->sleeping = false;
-                       if (st->throttling)
-                       {
-                               /* Measure lag of throttled transaction relative to target */
-                               int64           lag = now_us - st->txn_scheduled;
-
-                               thread->throttle_lag += lag;
-                               if (lag > thread->throttle_lag_max)
-                                       thread->throttle_lag_max = lag;
-                               st->throttling = false;
-                       }
-               }
-               else
+               if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
                        return true;            /* Still sleeping, nothing to do here */
+               /* Else done sleeping, go ahead with next command */
+               st->sleeping = false;
+               st->throttling = false;
        }
 
        if (st->listen)
@@ -1276,47 +1314,22 @@ top:
                 */
                if (is_latencies)
                {
-                       int                     cnum = commands[st->state]->command_num;
-
                        if (INSTR_TIME_IS_ZERO(now))
                                INSTR_TIME_SET_CURRENT(now);
-                       INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
-                                                                 now, st->stmt_begin);
-                       thread->exec_count[cnum]++;
+
+                       /* XXX could use a mutex here, but we choose not to */
+                       addToSimpleStats(&commands[st->state]->stats,
+                                                        INSTR_TIME_GET_DOUBLE(now) -
+                                                        INSTR_TIME_GET_DOUBLE(st->stmt_begin));
                }
 
                /* transaction finished: calculate latency and log the transaction */
                if (commands[st->state + 1] == NULL)
                {
-                       /* only calculate latency if an option is used that needs it */
-                       if (progress || throttle_delay || latency_limit)
-                       {
-                               int64           latency;
-
-                               if (INSTR_TIME_IS_ZERO(now))
-                                       INSTR_TIME_SET_CURRENT(now);
-
-                               latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
-
-                               st->txn_latencies += latency;
-
-                               /*
-                                * XXX In a long benchmark run of high-latency transactions,
-                                * this int64 addition eventually overflows.  For example, 100
-                                * threads running 10s transactions will overflow it in 2.56
-                                * hours.  With a more-typical OLTP workload of .1s
-                                * transactions, overflow would take 256 hours.
-                                */
-                               st->txn_sqlats += latency * latency;
-
-                               /* record over the limit transactions if needed. */
-                               if (latency_limit && latency > latency_limit)
-                                       thread->latency_late++;
-                       }
-
-                       /* record the time it took in the log */
-                       if (logfile)
-                               doLog(thread, st, logfile, &now, agg, false);
+                       if (progress || throttle_delay || latency_limit || logfile)
+                               processXactStats(thread, st, &now, false, logfile, agg);
+                       else
+                               thread->stats.cnt++;
                }
 
                if (commands[st->state]->type == SQL_COMMAND)
@@ -1391,7 +1404,7 @@ top:
                        return clientDone(st, false);
                }
                INSTR_TIME_SET_CURRENT(end);
-               INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
+               INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
        }
 
        /*
@@ -1496,7 +1509,7 @@ top:
                        st->ecnt++;
                }
                else
-                       st->listen = true;              /* flags that should be listened */
+                       st->listen = true;      /* flags that should be listened */
        }
        else if (commands[st->state]->type == META_COMMAND)
        {
@@ -1734,6 +1747,8 @@ top:
                        else    /* succeeded */
                                st->listen = true;
                }
+
+               /* after a meta command, immediately proceed with next command */
                goto top;
        }
 
@@ -1744,12 +1759,9 @@ top:
  * print log entry after completing one transaction.
  */
 static void
-doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
-         bool skipped)
+doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
+         StatsData *agg, bool skipped, double latency, double lag)
 {
-       double          lag;
-       double          latency;
-
        /*
         * Skip the log entry if sampling is enabled and this row doesn't belong
         * to the random sample.
@@ -1758,118 +1770,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
                pg_erand48(thread->random_state) > sample_rate)
                return;
 
-       if (INSTR_TIME_IS_ZERO(*now))
-               INSTR_TIME_SET_CURRENT(*now);
-
-       latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
-       if (skipped)
-               lag = latency;
-       else
-               lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
-
        /* should we aggregate the results or not? */
        if (agg_interval > 0)
        {
                /*
-                * Are we still in the same interval? If yes, accumulate the values
-                * (print them otherwise)
+                * Loop until we reach the interval of the current transaction, and
+                * print all the empty intervals in between (this may happen with very
+                * low tps, e.g. --rate=0.1).
                 */
-               if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
-               {
-                       agg->cnt += 1;
-                       if (skipped)
+               while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
+               {
+                       /* print aggregated report to logfile */
+                       fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+                                       agg->start_time,
+                                       agg->cnt,
+                                       agg->latency.sum,
+                                       agg->latency.sum2,
+                                       agg->latency.min,
+                                       agg->latency.max);
+                       if (throttle_delay)
                        {
-                               /*
-                                * there is no latency to record if the transaction was
-                                * skipped
-                                */
-                               agg->skipped += 1;
+                               fprintf(logfile, " %.0f %.0f %.0f %.0f",
+                                               agg->lag.sum,
+                                               agg->lag.sum2,
+                                               agg->lag.min,
+                                               agg->lag.max);
+                               if (latency_limit)
+                                       fprintf(logfile, " " INT64_FORMAT, agg->skipped);
                        }
-                       else
-                       {
-                               agg->sum_latency += latency;
-                               agg->sum2_latency += latency * latency;
-
-                               /* first in this aggregation interval */
-                               if ((agg->cnt == 1) || (latency < agg->min_latency))
-                                       agg->min_latency = latency;
-
-                               if ((agg->cnt == 1) || (latency > agg->max_latency))
-                                       agg->max_latency = latency;
+                       fputc('\n', logfile);
 
-                               /* and the same for schedule lag */
-                               if (throttle_delay)
-                               {
-                                       agg->sum_lag += lag;
-                                       agg->sum2_lag += lag * lag;
-
-                                       if ((agg->cnt == 1) || (lag < agg->min_lag))
-                                               agg->min_lag = lag;
-                                       if ((agg->cnt == 1) || (lag > agg->max_lag))
-                                               agg->max_lag = lag;
-                               }
-                       }
+                       /* reset data and move to next interval */
+                       initStats(agg, agg->start_time + agg_interval);
                }
-               else
-               {
-                       /*
-                        * Loop until we reach the interval of the current transaction
-                        * (and print all the empty intervals in between).
-                        */
-                       while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
-                       {
-                               /*
-                                * This is a non-Windows branch (thanks to the ifdef in
-                                * usage), so we don't need to handle this in a special way
-                                * (see below).
-                                */
-                               fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
-                                               agg->start_time,
-                                               agg->cnt,
-                                               agg->sum_latency,
-                                               agg->sum2_latency,
-                                               agg->min_latency,
-                                               agg->max_latency);
-                               if (throttle_delay)
-                               {
-                                       fprintf(logfile, " %.0f %.0f %.0f %.0f",
-                                                       agg->sum_lag,
-                                                       agg->sum2_lag,
-                                                       agg->min_lag,
-                                                       agg->max_lag);
-                                       if (latency_limit)
-                                               fprintf(logfile, " %d", agg->skipped);
-                               }
-                               fputc('\n', logfile);
-
-                               /* move to the next inteval */
-                               agg->start_time = agg->start_time + agg_interval;
-
-                               /* reset for "no transaction" intervals */
-                               agg->cnt = 0;
-                               agg->skipped = 0;
-                               agg->min_latency = 0;
-                               agg->max_latency = 0;
-                               agg->sum_latency = 0;
-                               agg->sum2_latency = 0;
-                               agg->min_lag = 0;
-                               agg->max_lag = 0;
-                               agg->sum_lag = 0;
-                               agg->sum2_lag = 0;
-                       }
 
-                       /* reset the values to include only the current transaction. */
-                       agg->cnt = 1;
-                       agg->skipped = skipped ? 1 : 0;
-                       agg->min_latency = latency;
-                       agg->max_latency = latency;
-                       agg->sum_latency = skipped ? 0.0 : latency;
-                       agg->sum2_latency = skipped ? 0.0 : latency * latency;
-                       agg->min_lag = lag;
-                       agg->max_lag = lag;
-                       agg->sum_lag = lag;
-                       agg->sum2_lag = lag * lag;
-               }
+               /* accumulate the current transaction */
+               accumStats(agg, skipped, latency, lag);
        }
        else
        {
@@ -1878,21 +1814,21 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
 
                /* This is more than we really ought to know about instr_time */
                if (skipped)
-                       fprintf(logfile, "%d %d skipped %d %ld %ld",
+                       fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
                                        st->id, st->cnt, st->use_file,
                                        (long) now->tv_sec, (long) now->tv_usec);
                else
-                       fprintf(logfile, "%d %d %.0f %d %ld %ld",
+                       fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
                                        st->id, st->cnt, latency, st->use_file,
                                        (long) now->tv_sec, (long) now->tv_usec);
 #else
 
                /* On Windows, instr_time doesn't provide a timestamp anyway */
                if (skipped)
-                       fprintf(logfile, "%d %d skipped %d 0 0",
+                       fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
                                        st->id, st->cnt, st->use_file);
                else
-                       fprintf(logfile, "%d %d %.0f %d 0 0",
+                       fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
                                        st->id, st->cnt, latency, st->use_file);
 #endif
                if (throttle_delay)
@@ -1901,6 +1837,44 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
        }
 }
 
+/*
+ * Accumulate and report statistics at end of a transaction.
+ *
+ * (This is also called when a transaction is late and thus skipped.)
+ */
+static void
+processXactStats(TState *thread, CState *st, instr_time *now,
+                                bool skipped, FILE *logfile, StatsData *agg)
+{
+       double          latency = 0.0,
+                               lag = 0.0;
+
+       if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
+               INSTR_TIME_SET_CURRENT(*now);
+
+       if (!skipped)
+       {
+               /* compute latency & lag */
+               latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
+               lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
+       }
+
+       if (progress || throttle_delay || latency_limit)
+       {
+               accumStats(&thread->stats, skipped, latency, lag);
+
+               /* count transactions over the latency limit, if needed */
+               if (latency_limit && latency > latency_limit)
+                       thread->latency_late++;
+       }
+       else
+               thread->stats.cnt++;
+
+       if (use_log)
+               doLog(thread, st, logfile, now, agg, skipped, latency, lag);
+}
+
+
 /* discard connections */
 static void
 disconnect_all(CState *state, int length)
@@ -2297,6 +2271,7 @@ process_commands(char *buf, const char *source, const int lineno)
        my_commands->command_num = num_commands++;
        my_commands->type = 0;          /* until set */
        my_commands->argc = 0;
+       initSimpleStats(&my_commands->stats);
 
        if (*p == '\\')
        {
@@ -2641,7 +2616,7 @@ process_builtin(const char *tb, const char *source)
 static void
 listAvailableScripts(void)
 {
-       int             i;
+       int                     i;
 
        fprintf(stderr, "Available builtin scripts:\n");
        for (i = 0; i < N_BUILTIN; i++)
@@ -2689,22 +2664,29 @@ addScript(const char *name, Command **commands)
        num_scripts++;
 }
 
+static void
+printSimpleStats(char *prefix, SimpleStats *ss)
+{
+       /* print NaN if no transactions where executed */
+       double          latency = ss->sum / ss->count;
+       double          stddev = sqrt(ss->sum2 / ss->count - latency * latency);
+
+       printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
+       printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
+}
+
 /* print out results */
 static void
-printResults(int64 normal_xacts, int nclients,
-                        TState *threads, int nthreads,
-                        instr_time total_time, instr_time conn_total_time,
-                        int64 total_latencies, int64 total_sqlats,
-                        int64 throttle_lag, int64 throttle_lag_max,
-                        int64 throttle_latency_skipped, int64 latency_late)
+printResults(TState *threads, StatsData *total, instr_time total_time,
+                        instr_time conn_total_time, int latency_late)
 {
        double          time_include,
                                tps_include,
                                tps_exclude;
 
        time_include = INSTR_TIME_GET_DOUBLE(total_time);
-       tps_include = normal_xacts / time_include;
-       tps_exclude = normal_xacts / (time_include -
+       tps_include = total->cnt / time_include;
+       tps_exclude = total->cnt / (time_include -
                                                (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
 
        printf("transaction type: %s\n",
@@ -2716,46 +2698,36 @@ printResults(int64 normal_xacts, int nclients,
        if (duration <= 0)
        {
                printf("number of transactions per client: %d\n", nxacts);
-               printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
-                          normal_xacts, (int64) nxacts * nclients);
+               printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
+                          total->cnt, nxacts * nclients);
        }
        else
        {
                printf("duration: %d s\n", duration);
                printf("number of transactions actually processed: " INT64_FORMAT "\n",
-                          normal_xacts);
+                          total->cnt);
        }
 
        /* Remaining stats are nonsensical if we failed to execute any xacts */
-       if (normal_xacts <= 0)
+       if (total->cnt <= 0)
                return;
 
        if (throttle_delay && latency_limit)
                printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
-                          throttle_latency_skipped,
-                          100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
+                          total->skipped,
+                          100.0 * total->skipped / (total->skipped + total->cnt));
 
        if (latency_limit)
-               printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
+               printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
                           latency_limit / 1000.0, latency_late,
-                  100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
+                          100.0 * latency_late / (total->skipped + total->cnt));
 
        if (throttle_delay || progress || latency_limit)
-       {
-               /* compute and show latency average and standard deviation */
-               double          latency = 0.001 * total_latencies / normal_xacts;
-               double          sqlat = (double) total_sqlats / normal_xacts;
-
-               printf("latency average: %.3f ms\n"
-                          "latency stddev: %.3f ms\n",
-                          latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
-       }
+               printSimpleStats("latency", &total->latency);
        else
-       {
                /* only an average latency computed from the duration is available */
                printf("latency average: %.3f ms\n",
-                          1000.0 * duration * nclients / normal_xacts);
-       }
+                          1000.0 * duration * nclients / total->cnt);
 
        if (throttle_delay)
        {
@@ -2766,7 +2738,7 @@ printResults(int64 normal_xacts, int nclients,
                 * the database load, or the Poisson throttling process.
                 */
                printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
-                          0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+                          0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
        }
 
        printf("tps = %f (including connections establishing)\n", tps_include);
@@ -2785,33 +2757,9 @@ printResults(int64 normal_xacts, int nclients,
                        printf(" - statement latencies in milliseconds:\n");
 
                        for (commands = sql_script[i].commands; *commands != NULL; commands++)
-                       {
-                               Command    *command = *commands;
-                               int                     cnum = command->command_num;
-                               double          total_time;
-                               instr_time      total_exec_elapsed;
-                               int                     total_exec_count;
-                               int                     t;
-
-                               /* Accumulate per-thread data for command */
-                               INSTR_TIME_SET_ZERO(total_exec_elapsed);
-                               total_exec_count = 0;
-                               for (t = 0; t < nthreads; t++)
-                               {
-                                       TState     *thread = &threads[t];
-
-                                       INSTR_TIME_ADD(total_exec_elapsed,
-                                                                  thread->exec_elapsed[cnum]);
-                                       total_exec_count += thread->exec_count[cnum];
-                               }
-
-                               if (total_exec_count > 0)
-                                       total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
-                               else
-                                       total_time = 0.0;
-
-                               printf("\t%f\t%s\n", total_time, command->line);
-                       }
+                               printf("   %11.3f  %s\n",
+                                 1000.0 * (*commands)->stats.sum / (*commands)->stats.count,
+                                          (*commands)->line);
                }
        }
 }
@@ -2860,8 +2808,6 @@ main(int argc, char **argv)
        };
 
        int                     c;
-       int                     nclients = 1;   /* default number of simulated clients */
-       int                     nthreads = 1;   /* default number of threads */
        int                     is_init_mode = 0;               /* initialize mode? */
        int                     is_no_vacuum = 0;               /* no vacuum at all before testing? */
        int                     do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
@@ -2878,13 +2824,8 @@ main(int argc, char **argv)
        instr_time      start_time;             /* start up time */
        instr_time      total_time;
        instr_time      conn_total_time;
-       int64           total_xacts = 0;
-       int64           total_latencies = 0;
-       int64           total_sqlats = 0;
-       int64           throttle_lag = 0;
-       int64           throttle_lag_max = 0;
-       int64           throttle_latency_skipped = 0;
        int64           latency_late = 0;
+       StatsData       stats;
        char       *desc;
 
        int                     i;
@@ -3071,14 +3012,14 @@ main(int argc, char **argv)
                        case 'S':
                                addScript(desc,
                                                  process_builtin(findBuiltin("select-only", &desc),
-                                                                                desc));
+                                                                                 desc));
                                benchmarking_option_set = true;
                                internal_script_used = true;
                                break;
                        case 'N':
                                addScript(desc,
                                                  process_builtin(findBuiltin("simple-update", &desc),
-                                                                                desc));
+                                                                                 desc));
                                benchmarking_option_set = true;
                                internal_script_used = true;
                                break;
@@ -3311,8 +3252,6 @@ main(int argc, char **argv)
         * changed after fork.
         */
        main_pid = (int) getpid();
-       progress_nclients = nclients;
-       progress_nthreads = nthreads;
 
        if (nclients > 1)
        {
@@ -3454,32 +3393,10 @@ main(int argc, char **argv)
                thread->random_state[0] = random();
                thread->random_state[1] = random();
                thread->random_state[2] = random();
-               thread->throttle_latency_skipped = 0;
                thread->latency_late = 0;
+               initStats(&thread->stats, 0.0);
 
                nclients_dealt += thread->nstate;
-
-               if (is_latencies)
-               {
-                       /* Reserve memory for the thread to store per-command latencies */
-                       int                     t;
-
-                       thread->exec_elapsed = (instr_time *)
-                               pg_malloc(sizeof(instr_time) * num_commands);
-                       thread->exec_count = (int *)
-                               pg_malloc(sizeof(int) * num_commands);
-
-                       for (t = 0; t < num_commands; t++)
-                       {
-                               INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
-                               thread->exec_count[t] = 0;
-                       }
-               }
-               else
-               {
-                       thread->exec_elapsed = NULL;
-                       thread->exec_count = NULL;
-               }
        }
 
        /* all clients must be assigned to a thread */
@@ -3522,11 +3439,11 @@ main(int argc, char **argv)
 #endif   /* ENABLE_THREAD_SAFETY */
 
        /* wait for threads and accumulate results */
+       initStats(&stats, 0.0);
        INSTR_TIME_SET_ZERO(conn_total_time);
        for (i = 0; i < nthreads; i++)
        {
                TState     *thread = &threads[i];
-               int                     j;
 
 #ifdef ENABLE_THREAD_SAFETY
                if (threads[i].thread == INVALID_THREAD)
@@ -3539,21 +3456,13 @@ main(int argc, char **argv)
                (void) threadRun(thread);
 #endif   /* ENABLE_THREAD_SAFETY */
 
-               /* thread level stats */
-               throttle_lag += thread->throttle_lag;
-               throttle_latency_skipped += threads->throttle_latency_skipped;
+               /* aggregate thread level stats */
+               mergeSimpleStats(&stats.latency, &thread->stats.latency);
+               mergeSimpleStats(&stats.lag, &thread->stats.lag);
+               stats.cnt += thread->stats.cnt;
+               stats.skipped += thread->stats.skipped;
                latency_late += thread->latency_late;
-               if (throttle_lag_max > thread->throttle_lag_max)
-                       throttle_lag_max = thread->throttle_lag_max;
                INSTR_TIME_ADD(conn_total_time, thread->conn_time);
-
-               /* client-level stats */
-               for (j = 0; j < thread->nstate; j++)
-               {
-                       total_xacts += thread->state[j].cnt;
-                       total_latencies += thread->state[j].txn_latencies;
-                       total_sqlats += thread->state[j].txn_sqlats;
-               }
        }
        disconnect_all(state, nclients);
 
@@ -3569,10 +3478,7 @@ main(int argc, char **argv)
         */
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
-       printResults(total_xacts, nclients, threads, nthreads,
-                                total_time, conn_total_time, total_latencies, total_sqlats,
-                                throttle_lag, throttle_lag_max, throttle_latency_skipped,
-                                latency_late);
+       printResults(threads, &stats, total_time, conn_total_time, latency_late);
 
        return 0;
 }
@@ -3593,13 +3499,8 @@ threadRun(void *arg)
        int64           thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
        int64           last_report = thread_start;
        int64           next_report = last_report + (int64) progress * 1000000;
-       int64           last_count = 0,
-                               last_lats = 0,
-                               last_sqlats = 0,
-                               last_lags = 0,
-                               last_skipped = 0;
-
-       AggVals         aggs;
+       StatsData       last,
+                               aggs;
 
        /*
         * Initialize throttling rate target for all of the thread's clients.  It
@@ -3609,8 +3510,6 @@ threadRun(void *arg)
         */
        INSTR_TIME_SET_CURRENT(start);
        thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
-       thread->throttle_lag = 0;
-       thread->throttle_lag_max = 0;
 
        INSTR_TIME_SET_ZERO(thread->conn_time);
 
@@ -3647,7 +3546,8 @@ threadRun(void *arg)
        INSTR_TIME_SET_CURRENT(thread->conn_time);
        INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
 
-       agg_vals_init(&aggs, thread->start_time);
+       initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
+       last = aggs;
 
        /* send start up queries in async manner */
        for (i = 0; i < nstate; i++)
@@ -3661,7 +3561,7 @@ threadRun(void *arg)
                if (debug)
                        fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
                                        sql_script[st->use_file].name);
-               if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+               if (!doCustom(thread, st, logfile, &aggs))
                        remains--;                      /* I've aborted */
 
                if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3800,7 +3700,7 @@ threadRun(void *arg)
                        if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
                                                        || commands[st->state]->type == META_COMMAND))
                        {
-                               if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+                               if (!doCustom(thread, st, logfile, &aggs))
                                        remains--;      /* I've aborted */
                        }
 
@@ -3825,11 +3725,7 @@ threadRun(void *arg)
                        if (now >= next_report)
                        {
                                /* generate and show report */
-                               int64           count = 0,
-                                                       lats = 0,
-                                                       sqlats = 0,
-                                                       lags = 0,
-                                                       skipped = 0;
+                               StatsData       cur;
                                int64           run = now - last_report;
                                double          tps,
                                                        total_run,
@@ -3850,25 +3746,24 @@ threadRun(void *arg)
                                 * (If a read from a 64-bit integer is not atomic, you might
                                 * get a "torn" read and completely bogus latencies though!)
                                 */
-                               for (i = 0; i < progress_nclients; i++)
+                               initStats(&cur, 0.0);
+                               for (i = 0; i < nthreads; i++)
                                {
-                                       count += state[i].cnt;
-                                       lats += state[i].txn_latencies;
-                                       sqlats += state[i].txn_sqlats;
-                               }
-
-                               for (i = 0; i < progress_nthreads; i++)
-                               {
-                                       skipped += thread[i].throttle_latency_skipped;
-                                       lags += thread[i].throttle_lag;
+                                       mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
+                                       mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
+                                       cur.cnt += thread[i].stats.cnt;
+                                       cur.skipped += thread[i].stats.skipped;
                                }
 
                                total_run = (now - thread_start) / 1000000.0;
-                               tps = 1000000.0 * (count - last_count) / run;
-                               latency = 0.001 * (lats - last_lats) / (count - last_count);
-                               sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
+                               tps = 1000000.0 * (cur.cnt - last.cnt) / run;
+                               latency = 0.001 * (cur.latency.sum - last.latency.sum) /
+                                       (cur.cnt - last.cnt);
+                               sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
+                                       / (cur.cnt - last.cnt);
                                stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
-                               lag = 0.001 * (lags - last_lags) / (count - last_count);
+                               lag = 0.001 * (cur.lag.sum - last.lag.sum) /
+                                       (cur.cnt - last.cnt);
 
                                if (progress_timestamp)
                                        sprintf(tbuf, "%.03f s",
@@ -3885,16 +3780,12 @@ threadRun(void *arg)
                                        fprintf(stderr, ", lag %.3f ms", lag);
                                        if (latency_limit)
                                                fprintf(stderr, ", " INT64_FORMAT " skipped",
-                                                               skipped - last_skipped);
+                                                               cur.skipped - last.skipped);
                                }
                                fprintf(stderr, "\n");
 
-                               last_count = count;
-                               last_lats = lats;
-                               last_sqlats = sqlats;
-                               last_lags = lags;
+                               last = cur;
                                last_report = now;
-                               last_skipped = skipped;
 
                                /*
                                 * Ensure that the next report is in the future, in case
@@ -3914,7 +3805,14 @@ done:
        INSTR_TIME_SET_CURRENT(end);
        INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
        if (logfile)
+       {
+               if (agg_interval)
+               {
+                       /* log aggregated but not yet reported transactions */
+                       doLog(thread, state, logfile, &end, &aggs, false, 0, 0);
+               }
                fclose(logfile);
+       }
        return NULL;
 }