From 1bc90f7a7b7441a88e2c6d4a0e9b6f9c1499ad30 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 3 Jul 2015 11:48:54 +0300 Subject: [PATCH] Remove thread-emulation support from pgbench. You can no longer use pgbench with multiple threads when compiled without --enable-thread-safety. That's an acceptable limitation these days; it still works fine with -j1, and all modern platforms support threads anyway. This makes future maintenance and development of the code easier. Fabien Coelho --- src/bin/pgbench/pgbench.c | 313 ++++++++------------------------------ 1 file changed, 66 insertions(+), 247 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 59e70b6f37..95be62cbbb 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -70,20 +70,8 @@ static int pthread_join(pthread_t th, void **thread_return); /* Use platform-dependent pthread capability */ #include #else -/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */ -#define PTHREAD_FORK_EMULATION -#include - -#define pthread_t pg_pthread_t -#define pthread_attr_t pg_pthread_attr_t -#define pthread_create pg_pthread_create -#define pthread_join pg_pthread_join - -typedef struct fork_pthread *pthread_t; -typedef int pthread_attr_t; - -static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); -static int pthread_join(pthread_t th, void **thread_return); +/* No threads implementation, use none (-j 1) */ +#define pthread_t void * #endif @@ -210,8 +198,6 @@ typedef struct PGconn *con; /* connection handle to DB */ int id; /* client No. */ int state; /* state No. */ - int cnt; /* xacts count */ - int ecnt; /* error count */ int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ @@ -221,15 +207,19 @@ typedef struct int64 txn_scheduled; /* scheduled start time of transaction (usec) */ instr_time txn_begin; /* used for measuring schedule lag times */ instr_time stmt_begin; /* used for measuring statement latencies */ - int64 txn_latencies; /* cumulated latencies */ - int64 txn_sqlats; /* cumulated square latencies */ bool is_throttled; /* whether transaction throttling is done */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; + + /* per client collected stats */ + int cnt; /* xacts count */ + int ecnt; /* error count */ + int64 txn_latencies; /* cumulated latencies */ + int64 txn_sqlats; /* cumulated square latencies */ } CState; /* - * Thread state and result + * Thread state */ typedef struct { @@ -242,6 +232,9 @@ typedef struct int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ int64 throttle_trigger; /* previous/next throttling (us) */ + + /* per thread collected stats */ + instr_time conn_time; int64 throttle_lag; /* total transaction lag behind throttling */ int64 throttle_lag_max; /* max transaction lag */ int64 throttle_latency_skipped; /* lagging transactions @@ -251,18 +244,6 @@ typedef struct #define INVALID_THREAD ((pthread_t) 0) -typedef struct -{ - instr_time conn_time; - int64 xacts; - int64 latencies; - int64 sqlats; - int64 throttle_lag; - int64 throttle_lag_max; - int64 throttle_latency_skipped; - int64 latency_late; -} TResult; - /* * queries read from files */ @@ -2926,6 +2907,13 @@ main(int argc, char **argv) fprintf(stderr, "invalid number of threads: %d\n", nthreads); exit(1); } +#ifndef ENABLE_THREAD_SAFETY + if (nthreads != 1) + { + fprintf(stderr, "threads are not supported on this platform, use -j1\n"); + exit(1); + } +#endif /* !ENABLE_THREAD_SAFETY */ break; case 'C': benchmarking_option_set = true; @@ -3194,22 +3182,6 @@ main(int argc, char **argv) exit(1); } - /* - * is_latencies only works with multiple threads in thread-based - * implementations, not fork-based ones, because it supposes that the - * parent can see changes made to the per-thread execution stats by child - * threads. It seems useful enough to accept despite this limitation, but - * perhaps we should FIXME someday (by passing the stats data back up - * through the parent-to-child pipes). - */ -#ifndef ENABLE_THREAD_SAFETY - if (is_latencies && nthreads > 1) - { - fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n"); - exit(1); - } -#endif - /* * save main process id in the global variable because process id will be * changed after fork. @@ -3414,6 +3386,7 @@ main(int argc, char **argv) setalarm(duration); /* start threads */ +#ifdef ENABLE_THREAD_SAFETY for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; @@ -3436,32 +3409,43 @@ main(int argc, char **argv) thread->thread = INVALID_THREAD; } } +#else + INSTR_TIME_SET_CURRENT(threads[0].start_time); + threads[0].thread = INVALID_THREAD; +#endif /* ENABLE_THREAD_SAFETY */ /* wait for threads and accumulate results */ INSTR_TIME_SET_ZERO(conn_total_time); for (i = 0; i < nthreads; i++) { - void *ret = NULL; + TState *thread = &threads[i]; + int j; +#ifdef ENABLE_THREAD_SAFETY if (threads[i].thread == INVALID_THREAD) - ret = threadRun(&threads[i]); + /* actually run this thread directly in the main thread */ + (void) threadRun(thread); else - pthread_join(threads[i].thread, &ret); + /* wait of other threads. should check that 0 is returned? */ + pthread_join(thread->thread, NULL); +#else + (void) threadRun(thread); +#endif /* ENABLE_THREAD_SAFETY */ - if (ret != NULL) - { - TResult *r = (TResult *) ret; + /* thread level stats */ + throttle_lag += thread->throttle_lag; + throttle_latency_skipped = threads->throttle_latency_skipped; + latency_late = thread->latency_late; + if (throttle_lag_max > thread->throttle_lag_max) + throttle_lag_max = thread->throttle_lag_max; + INSTR_TIME_ADD(conn_total_time, thread->conn_time); - total_xacts += r->xacts; - total_latencies += r->latencies; - total_sqlats += r->sqlats; - throttle_lag += r->throttle_lag; - throttle_latency_skipped += r->throttle_latency_skipped; - latency_late += r->latency_late; - if (r->throttle_lag_max > throttle_lag_max) - throttle_lag_max = r->throttle_lag_max; - INSTR_TIME_ADD(conn_total_time, r->conn_time); - free(ret); + /* client-level stats */ + for (j = 0; j < thread->nstate; j++) + { + total_xacts += thread->state[j].cnt; + total_latencies += thread->state[i].txn_latencies; + total_sqlats += thread->state[i].txn_sqlats; } } disconnect_all(state, nclients); @@ -3491,7 +3475,6 @@ threadRun(void *arg) { TState *thread = (TState *) arg; CState *state = thread->state; - TResult *result; FILE *logfile = NULL; /* per-thread log file */ instr_time start, end; @@ -3522,9 +3505,7 @@ threadRun(void *arg) thread->throttle_lag = 0; thread->throttle_lag_max = 0; - result = pg_malloc(sizeof(TResult)); - - INSTR_TIME_SET_ZERO(result->conn_time); + INSTR_TIME_SET_ZERO(thread->conn_time); /* open log file if requested */ if (use_log) @@ -3555,8 +3536,8 @@ threadRun(void *arg) } /* time after thread and connections set up */ - INSTR_TIME_SET_CURRENT(result->conn_time); - INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); + INSTR_TIME_SET_CURRENT(thread->conn_time); + INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time); agg_vals_init(&aggs, thread->start_time); @@ -3568,7 +3549,7 @@ threadRun(void *arg) int prev_ecnt = st->ecnt; st->use_file = getrand(thread, 0, num_files - 1); - if (!doCustom(thread, st, &result->conn_time, logfile, &aggs)) + if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs)) remains--; /* I've aborted */ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) @@ -3650,11 +3631,7 @@ threadRun(void *arg) } /* also wake up to print the next progress report on time */ - if (progress && min_usec > 0 -#if !defined(PTHREAD_FORK_EMULATION) - && thread->tid == 0 -#endif /* !PTHREAD_FORK_EMULATION */ - ) + if (progress && min_usec > 0) { /* get current time if needed */ if (now_usec == 0) @@ -3710,7 +3687,7 @@ threadRun(void *arg) if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) || commands[st->state]->type == META_COMMAND)) { - if (!doCustom(thread, st, &result->conn_time, logfile, &aggs)) + if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs)) remains--; /* I've aborted */ } @@ -3723,76 +3700,6 @@ threadRun(void *arg) } } -#ifdef PTHREAD_FORK_EMULATION - /* each process reports its own progression */ - if (progress) - { - instr_time now_time; - int64 now; - - INSTR_TIME_SET_CURRENT(now_time); - now = INSTR_TIME_GET_MICROSEC(now_time); - if (now >= next_report) - { - /* generate and show report */ - int64 count = 0, - lats = 0, - sqlats = 0, - skipped = 0; - int64 lags = thread->throttle_lag; - int64 run = now - last_report; - double tps, - total_run, - latency, - sqlat, - stdev, - lag; - - for (i = 0; i < nstate; i++) - { - count += state[i].cnt; - lats += state[i].txn_latencies; - sqlats += state[i].txn_sqlats; - } - - total_run = (now - thread_start) / 1000000.0; - tps = 1000000.0 * (count - last_count) / run; - latency = 0.001 * (lats - last_lats) / (count - last_count); - sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); - stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); - lag = 0.001 * (lags - last_lags) / (count - last_count); - skipped = thread->throttle_latency_skipped - last_skipped; - - fprintf(stderr, - "progress %d: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f", - thread->tid, total_run, tps, latency, stdev); - if (throttle_delay) - { - fprintf(stderr, ", lag %.3f ms", lag); - if (latency_limit) - fprintf(stderr, ", skipped " INT64_FORMAT, skipped); - } - fprintf(stderr, "\n"); - - last_count = count; - last_lats = lats; - last_sqlats = sqlats; - last_lags = lags; - last_report = now; - last_skipped = thread->throttle_latency_skipped; - - /* - * Ensure that the next report is in the future, in case - * pgbench/postgres got stuck somewhere. - */ - do - { - next_report += (int64) progress *1000000; - } while (now >= next_report); - } - } -#else /* progress report by thread 0 for all threads */ if (progress && thread->tid == 0) { @@ -3817,6 +3724,17 @@ threadRun(void *arg) lag, stdev; + /* + * Add up the statistics of all threads. + * + * XXX: No locking. There is no guarantee that we get an + * atomic snapshot of the transaction count and latencies, so + * these figures can well be off by a small amount. The + * progress is report's purpose is to give a quick overview of + * how the test is going, so that shouldn't matter too much. + * (If a read from a 64-bit integer is not atomic, you might + * get a "torn" read and completely bogus latencies though!) + */ for (i = 0; i < progress_nclients; i++) { count += state[i].cnt; @@ -3864,31 +3782,16 @@ threadRun(void *arg) } while (now >= next_report); } } -#endif /* PTHREAD_FORK_EMULATION */ } done: INSTR_TIME_SET_CURRENT(start); disconnect_all(state, nstate); - result->xacts = 0; - result->latencies = 0; - result->sqlats = 0; - for (i = 0; i < nstate; i++) - { - result->xacts += state[i].cnt; - result->latencies += state[i].txn_latencies; - result->sqlats += state[i].txn_sqlats; - } - result->throttle_lag = thread->throttle_lag; - result->throttle_lag_max = thread->throttle_lag_max; - result->throttle_latency_skipped = thread->throttle_latency_skipped; - result->latency_late = thread->latency_late; - INSTR_TIME_SET_CURRENT(end); - INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); + INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); if (logfile) fclose(logfile); - return result; + return NULL; } /* @@ -3910,90 +3813,6 @@ setalarm(int seconds) alarm(seconds); } -#ifndef ENABLE_THREAD_SAFETY - -/* - * implements pthread using fork. - */ - -typedef struct fork_pthread -{ - pid_t pid; - int pipes[2]; -} fork_pthread; - -static int -pthread_create(pthread_t *thread, - pthread_attr_t *attr, - void *(*start_routine) (void *), - void *arg) -{ - fork_pthread *th; - void *ret; - int rc; - - th = (fork_pthread *) pg_malloc(sizeof(fork_pthread)); - if (pipe(th->pipes) < 0) - { - free(th); - return errno; - } - - th->pid = fork(); - if (th->pid == -1) /* error */ - { - free(th); - return errno; - } - if (th->pid != 0) /* in parent process */ - { - close(th->pipes[1]); - *thread = th; - return 0; - } - - /* in child process */ - close(th->pipes[0]); - - /* set alarm again because the child does not inherit timers */ - if (duration > 0) - setalarm(duration); - - ret = start_routine(arg); - rc = write(th->pipes[1], ret, sizeof(TResult)); - (void) rc; - close(th->pipes[1]); - free(th); - exit(0); -} - -static int -pthread_join(pthread_t th, void **thread_return) -{ - int status; - - while (waitpid(th->pid, &status, 0) != th->pid) - { - if (errno != EINTR) - return errno; - } - - if (thread_return != NULL) - { - /* assume result is TResult */ - *thread_return = pg_malloc(sizeof(TResult)); - if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult)) - { - free(*thread_return); - *thread_return = NULL; - } - } - close(th->pipes[0]); - - free(th); - return 0; -} -#endif #else /* WIN32 */ static VOID CALLBACK -- 2.40.0