* transactions */
int progress = 0; /* thread progress report every this seconds */
bool progress_timestamp = false; /* progress report with Unix time */
-int progress_nclients = 0; /* number of clients for progress
- * report */
-int progress_nthreads = 0; /* number of threads for progress
- * report */
+int nclients = 1; /* number of clients */
+int nthreads = 1; /* number of threads */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
int main_pid; /* main process id used in log filename */
#define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
+/*
+ * Simple data structure to keep stats about something.
+ *
+ * XXX probably the first value should be kept and used as an offset for
+ * better numerical stability...
+ */
+typedef struct SimpleStats
+{
+ int64 count; /* how many values were encountered */
+ double min; /* the minimum seen */
+ double max; /* the maximum seen */
+ double sum; /* sum of values */
+ double sum2; /* sum of squared values */
+} SimpleStats;
+
+/*
+ * Data structure to hold various statistics: per-thread stats are maintained
+ * and merged together.
+ */
+typedef struct StatsData
+{
+ long start_time; /* interval start time, for aggregates */
+ int64 cnt; /* number of transactions */
+ int64 skipped; /* number of transactions skipped under --rate
+ * and --latency-limit */
+ SimpleStats latency;
+ SimpleStats lag;
+} StatsData;
+
/*
* Connection state
*/
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */
- int cnt; /* xacts count */
+ int64 cnt; /* transaction count */
int ecnt; /* error count */
- int64 txn_latencies; /* cumulated latencies */
- int64 txn_sqlats; /* cumulated square latencies */
} CState;
/*
pthread_t thread; /* thread handle */
CState *state; /* array of CState */
int nstate; /* length of state[] */
- instr_time start_time; /* thread start time */
- instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
- int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
/* per thread collected stats */
+ instr_time start_time; /* thread start time */
instr_time conn_time;
- int64 throttle_lag; /* total transaction lag behind throttling */
- int64 throttle_lag_max; /* max transaction lag */
- int64 throttle_latency_skipped; /* lagging transactions
- * skipped */
- int64 latency_late; /* late transactions */
+ StatsData stats;
+ int64 latency_late; /* executed but late transactions */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
char *argv[MAX_ARGS]; /* command word list */
int cols[MAX_ARGS]; /* corresponding column starting from 1 */
PgBenchExpr *expr; /* parsed expression */
+ SimpleStats stats; /* time spent in this command */
} Command;
-typedef struct
-{
-
- long start_time; /* when does the interval start */
- int cnt; /* number of transactions */
- int skipped; /* number of transactions skipped under --rate
- * and --latency-limit */
-
- double min_latency; /* min/max latencies */
- double max_latency;
- double sum_latency; /* sum(latency), sum(latency^2) - for
- * estimates */
- double sum2_latency;
-
- double min_lag;
- double max_lag;
- double sum_lag; /* sum(lag) */
- double sum2_lag; /* sum(lag*lag) */
-} AggVals;
-
static struct
{
const char *name;
- Command **commands;
-} sql_script[MAX_SCRIPTS]; /* SQL script files */
+ Command **commands;
+} sql_script[MAX_SCRIPTS]; /* SQL script files */
static int num_scripts; /* number of scripts in sql_script[] */
static int num_commands = 0; /* total number of Command structs */
static int debug = 0; /* debug flag */
static void setalarm(int seconds);
static void *threadRun(void *arg);
+static void processXactStats(TState *thread, CState *st, instr_time *now,
+ bool skipped, FILE *logfile, StatsData *agg);
static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
- AggVals *agg, bool skipped);
+ StatsData *agg, bool skipped, double latency, double lag);
+
static void
usage(void)
return (int64) (-log(uniform) * ((double) center) + 0.5);
}
+/*
+ * Initialize the given SimpleStats struct to all zeroes
+ */
+static void
+initSimpleStats(SimpleStats *ss)
+{
+ memset(ss, 0, sizeof(SimpleStats));
+}
+
+/*
+ * Accumulate one value into a SimpleStats struct.
+ */
+static void
+addToSimpleStats(SimpleStats *ss, double val)
+{
+ if (ss->count == 0 || val < ss->min)
+ ss->min = val;
+ if (ss->count == 0 || val > ss->max)
+ ss->max = val;
+ ss->count++;
+ ss->sum += val;
+ ss->sum2 += val * val;
+}
+
+/*
+ * Merge two SimpleStats objects
+ */
+static void
+mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
+{
+ if (acc->count == 0 || ss->min < acc->min)
+ acc->min = ss->min;
+ if (acc->count == 0 || ss->max > acc->max)
+ acc->max = ss->max;
+ acc->count += ss->count;
+ acc->sum += ss->sum;
+ acc->sum2 += ss->sum2;
+}
+
+/*
+ * Initialize a StatsData struct to mostly zeroes, with its start time set to
+ * the given value.
+ */
+static void
+initStats(StatsData *sd, double start_time)
+{
+ sd->start_time = start_time;
+ sd->cnt = 0;
+ sd->skipped = 0;
+ initSimpleStats(&sd->latency);
+ initSimpleStats(&sd->lag);
+}
+
+/*
+ * Accumulate one additional item into the given stats object.
+ */
+static void
+accumStats(StatsData *stats, bool skipped, double lat, double lag)
+{
+ stats->cnt++;
+
+ if (skipped)
+ {
+ /* no latency to record on skipped transactions */
+ stats->skipped++;
+ }
+ else
+ {
+ addToSimpleStats(&stats->latency, lat);
+
+ /* and possibly the same for schedule lag */
+ if (throttle_delay)
+ addToSimpleStats(&stats->lag, lag);
+ }
+}
+
/* call PQexec() and exit() on failure */
static void
executeStatement(PGconn *con, const char *sql)
return false; /* always false */
}
-static void
-agg_vals_init(AggVals *aggs, instr_time start)
-{
- /* basic counters */
- aggs->cnt = 0; /* number of transactions (includes skipped) */
- aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
-
- aggs->sum_latency = 0; /* SUM(latency) */
- aggs->sum2_latency = 0; /* SUM(latency*latency) */
-
- /* min and max transaction duration */
- aggs->min_latency = 0;
- aggs->max_latency = 0;
-
- /* schedule lag counters */
- aggs->sum_lag = 0;
- aggs->sum2_lag = 0;
- aggs->min_lag = 0;
- aggs->max_lag = 0;
-
- /* start of the current interval */
- aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
-}
-
static int
chooseScript(TState *thread)
{
/* return false iff client should be disconnected */
static bool
-doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
+doCustom(TState *thread, CState *st, FILE *logfile, StatsData *agg)
{
PGresult *res;
Command **commands;
now_us = INSTR_TIME_GET_MICROSEC(now);
while (thread->throttle_trigger < now_us - latency_limit)
{
- thread->throttle_latency_skipped++;
-
- if (logfile)
- doLog(thread, st, logfile, &now, agg, true);
-
+ processXactStats(thread, st, &now, true, logfile, agg);
+ /* next rendez-vous */
wait = getPoissonRand(thread, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
if (st->sleeping)
{ /* are we sleeping? */
- int64 now_us;
-
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
- if (st->txn_scheduled <= now_us)
- {
- /* Done sleeping, go ahead with next command */
- st->sleeping = false;
- if (st->throttling)
- {
- /* Measure lag of throttled transaction relative to target */
- int64 lag = now_us - st->txn_scheduled;
-
- thread->throttle_lag += lag;
- if (lag > thread->throttle_lag_max)
- thread->throttle_lag_max = lag;
- st->throttling = false;
- }
- }
- else
+ if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
return true; /* Still sleeping, nothing to do here */
+ /* Else done sleeping, go ahead with next command */
+ st->sleeping = false;
+ st->throttling = false;
}
if (st->listen)
*/
if (is_latencies)
{
- int cnum = commands[st->state]->command_num;
-
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
- INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
- now, st->stmt_begin);
- thread->exec_count[cnum]++;
+
+ /* XXX could use a mutex here, but we choose not to */
+ addToSimpleStats(&commands[st->state]->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
/* transaction finished: calculate latency and log the transaction */
if (commands[st->state + 1] == NULL)
{
- /* only calculate latency if an option is used that needs it */
- if (progress || throttle_delay || latency_limit)
- {
- int64 latency;
-
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
-
- latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
-
- st->txn_latencies += latency;
-
- /*
- * XXX In a long benchmark run of high-latency transactions,
- * this int64 addition eventually overflows. For example, 100
- * threads running 10s transactions will overflow it in 2.56
- * hours. With a more-typical OLTP workload of .1s
- * transactions, overflow would take 256 hours.
- */
- st->txn_sqlats += latency * latency;
-
- /* record over the limit transactions if needed. */
- if (latency_limit && latency > latency_limit)
- thread->latency_late++;
- }
-
- /* record the time it took in the log */
- if (logfile)
- doLog(thread, st, logfile, &now, agg, false);
+ if (progress || throttle_delay || latency_limit || logfile)
+ processXactStats(thread, st, &now, false, logfile, agg);
+ else
+ thread->stats.cnt++;
}
if (commands[st->state]->type == SQL_COMMAND)
return clientDone(st, false);
}
INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
}
/*
st->ecnt++;
}
else
- st->listen = true; /* flags that should be listened */
+ st->listen = true; /* flags that should be listened */
}
else if (commands[st->state]->type == META_COMMAND)
{
else /* succeeded */
st->listen = true;
}
+
+ /* after a meta command, immediately proceed with next command */
goto top;
}
* print log entry after completing one transaction.
*/
static void
-doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
- bool skipped)
+doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
+ StatsData *agg, bool skipped, double latency, double lag)
{
- double lag;
- double latency;
-
/*
* Skip the log entry if sampling is enabled and this row doesn't belong
* to the random sample.
pg_erand48(thread->random_state) > sample_rate)
return;
- if (INSTR_TIME_IS_ZERO(*now))
- INSTR_TIME_SET_CURRENT(*now);
-
- latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
- if (skipped)
- lag = latency;
- else
- lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
-
/* should we aggregate the results or not? */
if (agg_interval > 0)
{
/*
- * Are we still in the same interval? If yes, accumulate the values
- * (print them otherwise)
+ * Loop until we reach the interval of the current transaction, and
+ * print all the empty intervals in between (this may happen with very
+ * low tps, e.g. --rate=0.1).
*/
- if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
- {
- agg->cnt += 1;
- if (skipped)
+ while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
+ {
+ /* print aggregated report to logfile */
+ fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+ agg->start_time,
+ agg->cnt,
+ agg->latency.sum,
+ agg->latency.sum2,
+ agg->latency.min,
+ agg->latency.max);
+ if (throttle_delay)
{
- /*
- * there is no latency to record if the transaction was
- * skipped
- */
- agg->skipped += 1;
+ fprintf(logfile, " %.0f %.0f %.0f %.0f",
+ agg->lag.sum,
+ agg->lag.sum2,
+ agg->lag.min,
+ agg->lag.max);
+ if (latency_limit)
+ fprintf(logfile, " " INT64_FORMAT, agg->skipped);
}
- else
- {
- agg->sum_latency += latency;
- agg->sum2_latency += latency * latency;
-
- /* first in this aggregation interval */
- if ((agg->cnt == 1) || (latency < agg->min_latency))
- agg->min_latency = latency;
-
- if ((agg->cnt == 1) || (latency > agg->max_latency))
- agg->max_latency = latency;
+ fputc('\n', logfile);
- /* and the same for schedule lag */
- if (throttle_delay)
- {
- agg->sum_lag += lag;
- agg->sum2_lag += lag * lag;
-
- if ((agg->cnt == 1) || (lag < agg->min_lag))
- agg->min_lag = lag;
- if ((agg->cnt == 1) || (lag > agg->max_lag))
- agg->max_lag = lag;
- }
- }
+ /* reset data and move to next interval */
+ initStats(agg, agg->start_time + agg_interval);
}
- else
- {
- /*
- * Loop until we reach the interval of the current transaction
- * (and print all the empty intervals in between).
- */
- while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
- {
- /*
- * This is a non-Windows branch (thanks to the ifdef in
- * usage), so we don't need to handle this in a special way
- * (see below).
- */
- fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
- agg->start_time,
- agg->cnt,
- agg->sum_latency,
- agg->sum2_latency,
- agg->min_latency,
- agg->max_latency);
- if (throttle_delay)
- {
- fprintf(logfile, " %.0f %.0f %.0f %.0f",
- agg->sum_lag,
- agg->sum2_lag,
- agg->min_lag,
- agg->max_lag);
- if (latency_limit)
- fprintf(logfile, " %d", agg->skipped);
- }
- fputc('\n', logfile);
-
- /* move to the next inteval */
- agg->start_time = agg->start_time + agg_interval;
-
- /* reset for "no transaction" intervals */
- agg->cnt = 0;
- agg->skipped = 0;
- agg->min_latency = 0;
- agg->max_latency = 0;
- agg->sum_latency = 0;
- agg->sum2_latency = 0;
- agg->min_lag = 0;
- agg->max_lag = 0;
- agg->sum_lag = 0;
- agg->sum2_lag = 0;
- }
- /* reset the values to include only the current transaction. */
- agg->cnt = 1;
- agg->skipped = skipped ? 1 : 0;
- agg->min_latency = latency;
- agg->max_latency = latency;
- agg->sum_latency = skipped ? 0.0 : latency;
- agg->sum2_latency = skipped ? 0.0 : latency * latency;
- agg->min_lag = lag;
- agg->max_lag = lag;
- agg->sum_lag = lag;
- agg->sum2_lag = lag * lag;
- }
+ /* accumulate the current transaction */
+ accumStats(agg, skipped, latency, lag);
}
else
{
/* This is more than we really ought to know about instr_time */
if (skipped)
- fprintf(logfile, "%d %d skipped %d %ld %ld",
+ fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
st->id, st->cnt, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
else
- fprintf(logfile, "%d %d %.0f %d %ld %ld",
+ fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
if (skipped)
- fprintf(logfile, "%d %d skipped %d 0 0",
+ fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
st->id, st->cnt, st->use_file);
else
- fprintf(logfile, "%d %d %.0f %d 0 0",
+ fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
st->id, st->cnt, latency, st->use_file);
#endif
if (throttle_delay)
}
}
+/*
+ * Accumulate and report statistics at end of a transaction.
+ *
+ * (This is also called when a transaction is late and thus skipped.)
+ */
+static void
+processXactStats(TState *thread, CState *st, instr_time *now,
+ bool skipped, FILE *logfile, StatsData *agg)
+{
+ double latency = 0.0,
+ lag = 0.0;
+
+ if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
+ INSTR_TIME_SET_CURRENT(*now);
+
+ if (!skipped)
+ {
+ /* compute latency & lag */
+ latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
+ lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
+ }
+
+ if (progress || throttle_delay || latency_limit)
+ {
+ accumStats(&thread->stats, skipped, latency, lag);
+
+ /* count transactions over the latency limit, if needed */
+ if (latency_limit && latency > latency_limit)
+ thread->latency_late++;
+ }
+ else
+ thread->stats.cnt++;
+
+ if (use_log)
+ doLog(thread, st, logfile, now, agg, skipped, latency, lag);
+}
+
+
/* discard connections */
static void
disconnect_all(CState *state, int length)
my_commands->command_num = num_commands++;
my_commands->type = 0; /* until set */
my_commands->argc = 0;
+ initSimpleStats(&my_commands->stats);
if (*p == '\\')
{
static void
listAvailableScripts(void)
{
- int i;
+ int i;
fprintf(stderr, "Available builtin scripts:\n");
for (i = 0; i < N_BUILTIN; i++)
num_scripts++;
}
+static void
+printSimpleStats(char *prefix, SimpleStats *ss)
+{
+ /* print NaN if no transactions where executed */
+ double latency = ss->sum / ss->count;
+ double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
+
+ printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
+ printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
+}
+
/* print out results */
static void
-printResults(int64 normal_xacts, int nclients,
- TState *threads, int nthreads,
- instr_time total_time, instr_time conn_total_time,
- int64 total_latencies, int64 total_sqlats,
- int64 throttle_lag, int64 throttle_lag_max,
- int64 throttle_latency_skipped, int64 latency_late)
+printResults(TState *threads, StatsData *total, instr_time total_time,
+ instr_time conn_total_time, int latency_late)
{
double time_include,
tps_include,
tps_exclude;
time_include = INSTR_TIME_GET_DOUBLE(total_time);
- tps_include = normal_xacts / time_include;
- tps_exclude = normal_xacts / (time_include -
+ tps_include = total->cnt / time_include;
+ tps_exclude = total->cnt / (time_include -
(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
printf("transaction type: %s\n",
if (duration <= 0)
{
printf("number of transactions per client: %d\n", nxacts);
- printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
- normal_xacts, (int64) nxacts * nclients);
+ printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
+ total->cnt, nxacts * nclients);
}
else
{
printf("duration: %d s\n", duration);
printf("number of transactions actually processed: " INT64_FORMAT "\n",
- normal_xacts);
+ total->cnt);
}
/* Remaining stats are nonsensical if we failed to execute any xacts */
- if (normal_xacts <= 0)
+ if (total->cnt <= 0)
return;
if (throttle_delay && latency_limit)
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
- throttle_latency_skipped,
- 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
+ total->skipped,
+ 100.0 * total->skipped / (total->skipped + total->cnt));
if (latency_limit)
- printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
+ printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
latency_limit / 1000.0, latency_late,
- 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
+ 100.0 * latency_late / (total->skipped + total->cnt));
if (throttle_delay || progress || latency_limit)
- {
- /* compute and show latency average and standard deviation */
- double latency = 0.001 * total_latencies / normal_xacts;
- double sqlat = (double) total_sqlats / normal_xacts;
-
- printf("latency average: %.3f ms\n"
- "latency stddev: %.3f ms\n",
- latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
- }
+ printSimpleStats("latency", &total->latency);
else
- {
/* only an average latency computed from the duration is available */
printf("latency average: %.3f ms\n",
- 1000.0 * duration * nclients / normal_xacts);
- }
+ 1000.0 * duration * nclients / total->cnt);
if (throttle_delay)
{
* the database load, or the Poisson throttling process.
*/
printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
- 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+ 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
}
printf("tps = %f (including connections establishing)\n", tps_include);
printf(" - statement latencies in milliseconds:\n");
for (commands = sql_script[i].commands; *commands != NULL; commands++)
- {
- Command *command = *commands;
- int cnum = command->command_num;
- double total_time;
- instr_time total_exec_elapsed;
- int total_exec_count;
- int t;
-
- /* Accumulate per-thread data for command */
- INSTR_TIME_SET_ZERO(total_exec_elapsed);
- total_exec_count = 0;
- for (t = 0; t < nthreads; t++)
- {
- TState *thread = &threads[t];
-
- INSTR_TIME_ADD(total_exec_elapsed,
- thread->exec_elapsed[cnum]);
- total_exec_count += thread->exec_count[cnum];
- }
-
- if (total_exec_count > 0)
- total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
- else
- total_time = 0.0;
-
- printf("\t%f\t%s\n", total_time, command->line);
- }
+ printf(" %11.3f %s\n",
+ 1000.0 * (*commands)->stats.sum / (*commands)->stats.count,
+ (*commands)->line);
}
}
}
};
int c;
- int nclients = 1; /* default number of simulated clients */
- int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
instr_time start_time; /* start up time */
instr_time total_time;
instr_time conn_total_time;
- int64 total_xacts = 0;
- int64 total_latencies = 0;
- int64 total_sqlats = 0;
- int64 throttle_lag = 0;
- int64 throttle_lag_max = 0;
- int64 throttle_latency_skipped = 0;
int64 latency_late = 0;
+ StatsData stats;
char *desc;
int i;
case 'S':
addScript(desc,
process_builtin(findBuiltin("select-only", &desc),
- desc));
+ desc));
benchmarking_option_set = true;
internal_script_used = true;
break;
case 'N':
addScript(desc,
process_builtin(findBuiltin("simple-update", &desc),
- desc));
+ desc));
benchmarking_option_set = true;
internal_script_used = true;
break;
* changed after fork.
*/
main_pid = (int) getpid();
- progress_nclients = nclients;
- progress_nthreads = nthreads;
if (nclients > 1)
{
thread->random_state[0] = random();
thread->random_state[1] = random();
thread->random_state[2] = random();
- thread->throttle_latency_skipped = 0;
thread->latency_late = 0;
+ initStats(&thread->stats, 0.0);
nclients_dealt += thread->nstate;
-
- if (is_latencies)
- {
- /* Reserve memory for the thread to store per-command latencies */
- int t;
-
- thread->exec_elapsed = (instr_time *)
- pg_malloc(sizeof(instr_time) * num_commands);
- thread->exec_count = (int *)
- pg_malloc(sizeof(int) * num_commands);
-
- for (t = 0; t < num_commands; t++)
- {
- INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
- thread->exec_count[t] = 0;
- }
- }
- else
- {
- thread->exec_elapsed = NULL;
- thread->exec_count = NULL;
- }
}
/* all clients must be assigned to a thread */
#endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
+ initStats(&stats, 0.0);
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
- int j;
#ifdef ENABLE_THREAD_SAFETY
if (threads[i].thread == INVALID_THREAD)
(void) threadRun(thread);
#endif /* ENABLE_THREAD_SAFETY */
- /* thread level stats */
- throttle_lag += thread->throttle_lag;
- throttle_latency_skipped += threads->throttle_latency_skipped;
+ /* aggregate thread level stats */
+ mergeSimpleStats(&stats.latency, &thread->stats.latency);
+ mergeSimpleStats(&stats.lag, &thread->stats.lag);
+ stats.cnt += thread->stats.cnt;
+ stats.skipped += thread->stats.skipped;
latency_late += thread->latency_late;
- if (throttle_lag_max > thread->throttle_lag_max)
- throttle_lag_max = thread->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
-
- /* client-level stats */
- for (j = 0; j < thread->nstate; j++)
- {
- total_xacts += thread->state[j].cnt;
- total_latencies += thread->state[j].txn_latencies;
- total_sqlats += thread->state[j].txn_sqlats;
- }
}
disconnect_all(state, nclients);
*/
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
- printResults(total_xacts, nclients, threads, nthreads,
- total_time, conn_total_time, total_latencies, total_sqlats,
- throttle_lag, throttle_lag_max, throttle_latency_skipped,
- latency_late);
+ printResults(threads, &stats, total_time, conn_total_time, latency_late);
return 0;
}
int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
int64 last_report = thread_start;
int64 next_report = last_report + (int64) progress * 1000000;
- int64 last_count = 0,
- last_lats = 0,
- last_sqlats = 0,
- last_lags = 0,
- last_skipped = 0;
-
- AggVals aggs;
+ StatsData last,
+ aggs;
/*
* Initialize throttling rate target for all of the thread's clients. It
*/
INSTR_TIME_SET_CURRENT(start);
thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
- thread->throttle_lag = 0;
- thread->throttle_lag_max = 0;
INSTR_TIME_SET_ZERO(thread->conn_time);
INSTR_TIME_SET_CURRENT(thread->conn_time);
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
- agg_vals_init(&aggs, thread->start_time);
+ initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
+ last = aggs;
/* send start up queries in async manner */
for (i = 0; i < nstate; i++)
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].name);
- if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, logfile, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
- if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, logfile, &aggs))
remains--; /* I've aborted */
}
if (now >= next_report)
{
/* generate and show report */
- int64 count = 0,
- lats = 0,
- sqlats = 0,
- lags = 0,
- skipped = 0;
+ StatsData cur;
int64 run = now - last_report;
double tps,
total_run,
* (If a read from a 64-bit integer is not atomic, you might
* get a "torn" read and completely bogus latencies though!)
*/
- for (i = 0; i < progress_nclients; i++)
+ initStats(&cur, 0.0);
+ for (i = 0; i < nthreads; i++)
{
- count += state[i].cnt;
- lats += state[i].txn_latencies;
- sqlats += state[i].txn_sqlats;
- }
-
- for (i = 0; i < progress_nthreads; i++)
- {
- skipped += thread[i].throttle_latency_skipped;
- lags += thread[i].throttle_lag;
+ mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
+ mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
+ cur.cnt += thread[i].stats.cnt;
+ cur.skipped += thread[i].stats.skipped;
}
total_run = (now - thread_start) / 1000000.0;
- tps = 1000000.0 * (count - last_count) / run;
- latency = 0.001 * (lats - last_lats) / (count - last_count);
- sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
+ tps = 1000000.0 * (cur.cnt - last.cnt) / run;
+ latency = 0.001 * (cur.latency.sum - last.latency.sum) /
+ (cur.cnt - last.cnt);
+ sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
+ / (cur.cnt - last.cnt);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
- lag = 0.001 * (lags - last_lags) / (count - last_count);
+ lag = 0.001 * (cur.lag.sum - last.lag.sum) /
+ (cur.cnt - last.cnt);
if (progress_timestamp)
sprintf(tbuf, "%.03f s",
fprintf(stderr, ", lag %.3f ms", lag);
if (latency_limit)
fprintf(stderr, ", " INT64_FORMAT " skipped",
- skipped - last_skipped);
+ cur.skipped - last.skipped);
}
fprintf(stderr, "\n");
- last_count = count;
- last_lats = lats;
- last_sqlats = sqlats;
- last_lags = lags;
+ last = cur;
last_report = now;
- last_skipped = skipped;
/*
* Ensure that the next report is in the future, in case
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (logfile)
+ {
+ if (agg_interval)
+ {
+ /* log aggregated but not yet reported transactions */
+ doLog(thread, state, logfile, &end, &aggs, false, 0, 0);
+ }
fclose(logfile);
+ }
return NULL;
}