#include <pthread.h>
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
#include <sys/wait.h>
#define pthread_t pg_pthread_t
bool use_quiet; /* quiet logging onto stderr */
int agg_interval; /* log aggregates instead of individual
* transactions */
+int progress = 0; /* thread progress report every this seconds */
+int progress_nclients = 0; /* number of clients for progress report */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
int main_pid; /* main process id used in log filename */
"(default: simple)\n"
" -n, --no-vacuum do not run VACUUM before tests\n"
" -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
+ " -P, --progress NUM show thread progress report every NUM seconds\n"
" -r, --report-latencies report average latency per command\n"
" -s, --scale=NUM report this scale factor in output\n"
" -S, --select-only perform SELECT-only transactions\n"
{"log", no_argument, NULL, 'l'},
{"no-vacuum", no_argument, NULL, 'n'},
{"port", required_argument, NULL, 'p'},
+ {"progress", required_argument, NULL, 'P'},
{"protocol", required_argument, NULL, 'M'},
{"quiet", no_argument, NULL, 'q'},
{"report-latencies", no_argument, NULL, 'r'},
state = (CState *) pg_malloc(sizeof(CState));
memset(state, 0, sizeof(CState));
- while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1)
switch (c)
+ case 'P':
+ progress = atoi(optarg);
+ if (progress <= 0)
+ {
+ fprintf(stderr,
+ "thread progress delay (-P) must be positive (%s)\n",
+ optarg);
+ exit(1);
+ }
+ break;
case 0:
/* This covers long options which take no argument. */
* changed after fork.
main_pid = (int) getpid();
+ progress_nclients = nclients;
if (nclients > 1)
int nstate = thread->nstate;
int remains = nstate; /* number of remaining clients */
int i;
+ /* for reporting progress: */
+ int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
+ int64 last_report = thread_start;
+ int64 next_report = last_report + progress * 1000000;
+ int64 last_count = 0;
AggVals aggs;
st->con = NULL;
+ /* each process reports its own progression */
+ if (progress)
+ {
+ instr_time now_time;
+ int64 now;
+ now = INSTR_TIME_GET_MICROSEC(now_time);
+ if (now >= next_report)
+ {
+ /* generate and show report */
+ int64 count = 0;
+ int64 run = now - last_report;
+ float tps, total_run, latency;
+ for (i = 0 ; i < nstate ; i++)
+ count += state[i].cnt;
+ total_run = (now - thread_start) / 1000000.0;
+ tps = 1000000.0 * (count - last_count) / run;
+ latency = 1000.0 * nstate / tps;
+ fprintf(stderr, "progress %d: %.1f s, %.1f tps, %.3f ms lat\n",
+ thread->tid, total_run, tps, latency);
+ last_count = count;
+ last_report = now;
+ next_report += progress * 1000000;
+ }
+ }
+ /* progress report by thread 0 for all threads */
+ if (progress && thread->tid == 0)
+ {
+ instr_time now_time;
+ int64 now;
+ now = INSTR_TIME_GET_MICROSEC(now_time);
+ if (now >= next_report)
+ {
+ /* generate and show report */
+ int64 count = 0;
+ int64 run = now - last_report;
+ float tps, total_run, latency;
+ for (i = 0 ; i < progress_nclients ; i++)
+ count += state[i].cnt;
+ total_run = (now - thread_start) / 1000000.0;
+ tps = 1000000.0 * (count - last_count) / run;
+ latency = 1000.0 * progress_nclients / tps;
+ fprintf(stderr, "progress: %.1f s, %.1f tps, %.3f ms lat\n",
+ total_run, tps, latency);
+ last_count = count;
+ last_report = now;
+ next_report += progress * 1000000;
+ }
+ }