/* Use platform-dependent pthread capability */
#include <pthread.h>
#else
-/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-#define PTHREAD_FORK_EMULATION
-#include <sys/wait.h>
-
-#define pthread_t pg_pthread_t
-#define pthread_attr_t pg_pthread_attr_t
-#define pthread_create pg_pthread_create
-#define pthread_join pg_pthread_join
-
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int pthread_join(pthread_t th, void **thread_return);
+/* No threads implementation, use none (-j 1) */
+#define pthread_t void *
#endif
PGconn *con; /* connection handle to DB */
int id; /* client No. */
int state; /* state No. */
- int cnt; /* xacts count */
- int ecnt; /* error count */
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
- int64 txn_latencies; /* cumulated latencies */
- int64 txn_sqlats; /* cumulated square latencies */
bool is_throttled; /* whether transaction throttling is done */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
+
+ /* per client collected stats */
+ int cnt; /* xacts count */
+ int ecnt; /* error count */
+ int64 txn_latencies; /* cumulated latencies */
+ int64 txn_sqlats; /* cumulated square latencies */
} CState;
/*
- * Thread state and result
+ * Thread state
*/
typedef struct
{
int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
+
+ /* per thread collected stats */
+ instr_time conn_time;
int64 throttle_lag; /* total transaction lag behind throttling */
int64 throttle_lag_max; /* max transaction lag */
int64 throttle_latency_skipped; /* lagging transactions
#define INVALID_THREAD ((pthread_t) 0)
-typedef struct
-{
- instr_time conn_time;
- int64 xacts;
- int64 latencies;
- int64 sqlats;
- int64 throttle_lag;
- int64 throttle_lag_max;
- int64 throttle_latency_skipped;
- int64 latency_late;
-} TResult;
-
/*
* queries read from files
*/
fprintf(stderr, "invalid number of threads: %d\n", nthreads);
exit(1);
}
+#ifndef ENABLE_THREAD_SAFETY
+ if (nthreads != 1)
+ {
+ fprintf(stderr, "threads are not supported on this platform, use -j1\n");
+ exit(1);
+ }
+#endif /* !ENABLE_THREAD_SAFETY */
break;
case 'C':
benchmarking_option_set = true;
exit(1);
}
- /*
- * is_latencies only works with multiple threads in thread-based
- * implementations, not fork-based ones, because it supposes that the
- * parent can see changes made to the per-thread execution stats by child
- * threads. It seems useful enough to accept despite this limitation, but
- * perhaps we should FIXME someday (by passing the stats data back up
- * through the parent-to-child pipes).
- */
-#ifndef ENABLE_THREAD_SAFETY
- if (is_latencies && nthreads > 1)
- {
- fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
- exit(1);
- }
-#endif
-
/*
* save main process id in the global variable because process id will be
* changed after fork.
setalarm(duration);
/* start threads */
+#ifdef ENABLE_THREAD_SAFETY
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
thread->thread = INVALID_THREAD;
}
}
+#else
+ INSTR_TIME_SET_CURRENT(threads[0].start_time);
+ threads[0].thread = INVALID_THREAD;
+#endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
- void *ret = NULL;
+ TState *thread = &threads[i];
+ int j;
+#ifdef ENABLE_THREAD_SAFETY
if (threads[i].thread == INVALID_THREAD)
- ret = threadRun(&threads[i]);
+ /* actually run this thread directly in the main thread */
+ (void) threadRun(thread);
else
- pthread_join(threads[i].thread, &ret);
+ /* wait of other threads. should check that 0 is returned? */
+ pthread_join(thread->thread, NULL);
+#else
+ (void) threadRun(thread);
+#endif /* ENABLE_THREAD_SAFETY */
- if (ret != NULL)
- {
- TResult *r = (TResult *) ret;
+ /* thread level stats */
+ throttle_lag += thread->throttle_lag;
+ throttle_latency_skipped = threads->throttle_latency_skipped;
+ latency_late = thread->latency_late;
+ if (throttle_lag_max > thread->throttle_lag_max)
+ throttle_lag_max = thread->throttle_lag_max;
+ INSTR_TIME_ADD(conn_total_time, thread->conn_time);
- total_xacts += r->xacts;
- total_latencies += r->latencies;
- total_sqlats += r->sqlats;
- throttle_lag += r->throttle_lag;
- throttle_latency_skipped += r->throttle_latency_skipped;
- latency_late += r->latency_late;
- if (r->throttle_lag_max > throttle_lag_max)
- throttle_lag_max = r->throttle_lag_max;
- INSTR_TIME_ADD(conn_total_time, r->conn_time);
- free(ret);
+ /* client-level stats */
+ for (j = 0; j < thread->nstate; j++)
+ {
+ total_xacts += thread->state[j].cnt;
+ total_latencies += thread->state[i].txn_latencies;
+ total_sqlats += thread->state[i].txn_sqlats;
}
}
disconnect_all(state, nclients);
{
TState *thread = (TState *) arg;
CState *state = thread->state;
- TResult *result;
FILE *logfile = NULL; /* per-thread log file */
instr_time start,
end;
thread->throttle_lag = 0;
thread->throttle_lag_max = 0;
- result = pg_malloc(sizeof(TResult));
-
- INSTR_TIME_SET_ZERO(result->conn_time);
+ INSTR_TIME_SET_ZERO(thread->conn_time);
/* open log file if requested */
if (use_log)
}
/* time after thread and connections set up */
- INSTR_TIME_SET_CURRENT(result->conn_time);
- INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+ INSTR_TIME_SET_CURRENT(thread->conn_time);
+ INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
agg_vals_init(&aggs, thread->start_time);
int prev_ecnt = st->ecnt;
st->use_file = getrand(thread, 0, num_files - 1);
- if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
}
/* also wake up to print the next progress report on time */
- if (progress && min_usec > 0
-#if !defined(PTHREAD_FORK_EMULATION)
- && thread->tid == 0
-#endif /* !PTHREAD_FORK_EMULATION */
- )
+ if (progress && min_usec > 0)
{
/* get current time if needed */
if (now_usec == 0)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
- if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
remains--; /* I've aborted */
}
}
}
-#ifdef PTHREAD_FORK_EMULATION
- /* each process reports its own progression */
- if (progress)
- {
- instr_time now_time;
- int64 now;
-
- INSTR_TIME_SET_CURRENT(now_time);
- now = INSTR_TIME_GET_MICROSEC(now_time);
- if (now >= next_report)
- {
- /* generate and show report */
- int64 count = 0,
- lats = 0,
- sqlats = 0,
- skipped = 0;
- int64 lags = thread->throttle_lag;
- int64 run = now - last_report;
- double tps,
- total_run,
- latency,
- sqlat,
- stdev,
- lag;
-
- for (i = 0; i < nstate; i++)
- {
- count += state[i].cnt;
- lats += state[i].txn_latencies;
- sqlats += state[i].txn_sqlats;
- }
-
- total_run = (now - thread_start) / 1000000.0;
- tps = 1000000.0 * (count - last_count) / run;
- latency = 0.001 * (lats - last_lats) / (count - last_count);
- sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
- stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
- lag = 0.001 * (lags - last_lags) / (count - last_count);
- skipped = thread->throttle_latency_skipped - last_skipped;
-
- fprintf(stderr,
- "progress %d: %.1f s, %.1f tps, "
- "lat %.3f ms stddev %.3f",
- thread->tid, total_run, tps, latency, stdev);
- if (throttle_delay)
- {
- fprintf(stderr, ", lag %.3f ms", lag);
- if (latency_limit)
- fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
- }
- fprintf(stderr, "\n");
-
- last_count = count;
- last_lats = lats;
- last_sqlats = sqlats;
- last_lags = lags;
- last_report = now;
- last_skipped = thread->throttle_latency_skipped;
-
- /*
- * Ensure that the next report is in the future, in case
- * pgbench/postgres got stuck somewhere.
- */
- do
- {
- next_report += (int64) progress *1000000;
- } while (now >= next_report);
- }
- }
-#else
/* progress report by thread 0 for all threads */
if (progress && thread->tid == 0)
{
lag,
stdev;
+ /*
+ * Add up the statistics of all threads.
+ *
+ * XXX: No locking. There is no guarantee that we get an
+ * atomic snapshot of the transaction count and latencies, so
+ * these figures can well be off by a small amount. The
+ * progress is report's purpose is to give a quick overview of
+ * how the test is going, so that shouldn't matter too much.
+ * (If a read from a 64-bit integer is not atomic, you might
+ * get a "torn" read and completely bogus latencies though!)
+ */
for (i = 0; i < progress_nclients; i++)
{
count += state[i].cnt;
} while (now >= next_report);
}
}
-#endif /* PTHREAD_FORK_EMULATION */
}
done:
INSTR_TIME_SET_CURRENT(start);
disconnect_all(state, nstate);
- result->xacts = 0;
- result->latencies = 0;
- result->sqlats = 0;
- for (i = 0; i < nstate; i++)
- {
- result->xacts += state[i].cnt;
- result->latencies += state[i].txn_latencies;
- result->sqlats += state[i].txn_sqlats;
- }
- result->throttle_lag = thread->throttle_lag;
- result->throttle_lag_max = thread->throttle_lag_max;
- result->throttle_latency_skipped = thread->throttle_latency_skipped;
- result->latency_late = thread->latency_late;
-
INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (logfile)
fclose(logfile);
- return result;
+ return NULL;
}
/*
alarm(seconds);
}
-#ifndef ENABLE_THREAD_SAFETY
-
-/*
- * implements pthread using fork.
- */
-
-typedef struct fork_pthread
-{
- pid_t pid;
- int pipes[2];
-} fork_pthread;
-
-static int
-pthread_create(pthread_t *thread,
- pthread_attr_t *attr,
- void *(*start_routine) (void *),
- void *arg)
-{
- fork_pthread *th;
- void *ret;
- int rc;
-
- th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
- if (pipe(th->pipes) < 0)
- {
- free(th);
- return errno;
- }
-
- th->pid = fork();
- if (th->pid == -1) /* error */
- {
- free(th);
- return errno;
- }
- if (th->pid != 0) /* in parent process */
- {
- close(th->pipes[1]);
- *thread = th;
- return 0;
- }
-
- /* in child process */
- close(th->pipes[0]);
-
- /* set alarm again because the child does not inherit timers */
- if (duration > 0)
- setalarm(duration);
-
- ret = start_routine(arg);
- rc = write(th->pipes[1], ret, sizeof(TResult));
- (void) rc;
- close(th->pipes[1]);
- free(th);
- exit(0);
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
- int status;
-
- while (waitpid(th->pid, &status, 0) != th->pid)
- {
- if (errno != EINTR)
- return errno;
- }
-
- if (thread_return != NULL)
- {
- /* assume result is TResult */
- *thread_return = pg_malloc(sizeof(TResult));
- if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
- {
- free(*thread_return);
- *thread_return = NULL;
- }
- }
- close(th->pipes[0]);
-
- free(th);
- return 0;
-}
-#endif
#else /* WIN32 */
static VOID CALLBACK