]> granicus.if.org Git - postgresql/commitdiff
Remove thread-emulation support from pgbench.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 3 Jul 2015 08:48:54 +0000 (11:48 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 3 Jul 2015 08:51:36 +0000 (11:51 +0300)
You can no longer use pgbench with multiple threads when compiled without
--enable-thread-safety. That's an acceptable limitation these days; it
still works fine with -j1, and all modern platforms support threads anyway.
This makes future maintenance and development of the code easier.

Fabien Coelho

src/bin/pgbench/pgbench.c

index 59e70b6f375bb561966fbf68879f77485d070e6b..95be62cbbbbc85637bcd942dfa8e90df65d301b4 100644 (file)
@@ -70,20 +70,8 @@ static int   pthread_join(pthread_t th, void **thread_return);
 /* Use platform-dependent pthread capability */
 #include <pthread.h>
 #else
-/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-#define PTHREAD_FORK_EMULATION
-#include <sys/wait.h>
-
-#define pthread_t                              pg_pthread_t
-#define pthread_attr_t                 pg_pthread_attr_t
-#define pthread_create                 pg_pthread_create
-#define pthread_join                   pg_pthread_join
-
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int     pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int     pthread_join(pthread_t th, void **thread_return);
+/* No threads implementation, use none (-j 1) */
+#define pthread_t void *
 #endif
 
 
@@ -210,8 +198,6 @@ typedef struct
        PGconn     *con;                        /* connection handle to DB */
        int                     id;                             /* client No. */
        int                     state;                  /* state No. */
-       int                     cnt;                    /* xacts count */
-       int                     ecnt;                   /* error count */
        int                     listen;                 /* 0 indicates that an async query has been
                                                                 * sent */
        int                     sleeping;               /* 1 indicates that the client is napping */
@@ -221,15 +207,19 @@ typedef struct
        int64           txn_scheduled;  /* scheduled start time of transaction (usec) */
        instr_time      txn_begin;              /* used for measuring schedule lag times */
        instr_time      stmt_begin;             /* used for measuring statement latencies */
-       int64           txn_latencies;  /* cumulated latencies */
-       int64           txn_sqlats;             /* cumulated square latencies */
        bool            is_throttled;   /* whether transaction throttling is done */
        int                     use_file;               /* index in sql_files for this client */
        bool            prepared[MAX_FILES];
+
+       /* per client collected stats */
+       int                     cnt;                    /* xacts count */
+       int                     ecnt;                   /* error count */
+       int64           txn_latencies;  /* cumulated latencies */
+       int64           txn_sqlats;             /* cumulated square latencies */
 } CState;
 
 /*
- * Thread state and result
+ * Thread state
  */
 typedef struct
 {
@@ -242,6 +232,9 @@ typedef struct
        int                *exec_count;         /* number of cmd executions (per Command) */
        unsigned short random_state[3];         /* separate randomness for each thread */
        int64           throttle_trigger;               /* previous/next throttling (us) */
+
+       /* per thread collected stats */
+       instr_time      conn_time;
        int64           throttle_lag;   /* total transaction lag behind throttling */
        int64           throttle_lag_max;               /* max transaction lag */
        int64           throttle_latency_skipped;               /* lagging transactions
@@ -251,18 +244,6 @@ typedef struct
 
 #define INVALID_THREAD         ((pthread_t) 0)
 
-typedef struct
-{
-       instr_time      conn_time;
-       int64           xacts;
-       int64           latencies;
-       int64           sqlats;
-       int64           throttle_lag;
-       int64           throttle_lag_max;
-       int64           throttle_latency_skipped;
-       int64           latency_late;
-} TResult;
-
 /*
  * queries read from files
  */
@@ -2926,6 +2907,13 @@ main(int argc, char **argv)
                                        fprintf(stderr, "invalid number of threads: %d\n", nthreads);
                                        exit(1);
                                }
+#ifndef ENABLE_THREAD_SAFETY
+                               if (nthreads != 1)
+                               {
+                                       fprintf(stderr, "threads are not supported on this platform, use -j1\n");
+                                       exit(1);
+                               }
+#endif   /* !ENABLE_THREAD_SAFETY */
                                break;
                        case 'C':
                                benchmarking_option_set = true;
@@ -3194,22 +3182,6 @@ main(int argc, char **argv)
                exit(1);
        }
 
-       /*
-        * is_latencies only works with multiple threads in thread-based
-        * implementations, not fork-based ones, because it supposes that the
-        * parent can see changes made to the per-thread execution stats by child
-        * threads.  It seems useful enough to accept despite this limitation, but
-        * perhaps we should FIXME someday (by passing the stats data back up
-        * through the parent-to-child pipes).
-        */
-#ifndef ENABLE_THREAD_SAFETY
-       if (is_latencies && nthreads > 1)
-       {
-               fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
-               exit(1);
-       }
-#endif
-
        /*
         * save main process id in the global variable because process id will be
         * changed after fork.
@@ -3414,6 +3386,7 @@ main(int argc, char **argv)
                setalarm(duration);
 
        /* start threads */
+#ifdef ENABLE_THREAD_SAFETY
        for (i = 0; i < nthreads; i++)
        {
                TState     *thread = &threads[i];
@@ -3436,32 +3409,43 @@ main(int argc, char **argv)
                        thread->thread = INVALID_THREAD;
                }
        }
+#else
+       INSTR_TIME_SET_CURRENT(threads[0].start_time);
+       threads[0].thread = INVALID_THREAD;
+#endif   /* ENABLE_THREAD_SAFETY */
 
        /* wait for threads and accumulate results */
        INSTR_TIME_SET_ZERO(conn_total_time);
        for (i = 0; i < nthreads; i++)
        {
-               void       *ret = NULL;
+               TState     *thread = &threads[i];
+               int                     j;
 
+#ifdef ENABLE_THREAD_SAFETY
                if (threads[i].thread == INVALID_THREAD)
-                       ret = threadRun(&threads[i]);
+                       /* actually run this thread directly in the main thread */
+                       (void) threadRun(thread);
                else
-                       pthread_join(threads[i].thread, &ret);
+                       /* wait of other threads. should check that 0 is returned? */
+                       pthread_join(thread->thread, NULL);
+#else
+               (void) threadRun(thread);
+#endif   /* ENABLE_THREAD_SAFETY */
 
-               if (ret != NULL)
-               {
-                       TResult    *r = (TResult *) ret;
+               /* thread level stats */
+               throttle_lag += thread->throttle_lag;
+               throttle_latency_skipped = threads->throttle_latency_skipped;
+               latency_late = thread->latency_late;
+               if (throttle_lag_max > thread->throttle_lag_max)
+                       throttle_lag_max = thread->throttle_lag_max;
+               INSTR_TIME_ADD(conn_total_time, thread->conn_time);
 
-                       total_xacts += r->xacts;
-                       total_latencies += r->latencies;
-                       total_sqlats += r->sqlats;
-                       throttle_lag += r->throttle_lag;
-                       throttle_latency_skipped += r->throttle_latency_skipped;
-                       latency_late += r->latency_late;
-                       if (r->throttle_lag_max > throttle_lag_max)
-                               throttle_lag_max = r->throttle_lag_max;
-                       INSTR_TIME_ADD(conn_total_time, r->conn_time);
-                       free(ret);
+               /* client-level stats */
+               for (j = 0; j < thread->nstate; j++)
+               {
+                       total_xacts += thread->state[j].cnt;
+                       total_latencies += thread->state[i].txn_latencies;
+                       total_sqlats += thread->state[i].txn_sqlats;
                }
        }
        disconnect_all(state, nclients);
@@ -3491,7 +3475,6 @@ threadRun(void *arg)
 {
        TState     *thread = (TState *) arg;
        CState     *state = thread->state;
-       TResult    *result;
        FILE       *logfile = NULL; /* per-thread log file */
        instr_time      start,
                                end;
@@ -3522,9 +3505,7 @@ threadRun(void *arg)
        thread->throttle_lag = 0;
        thread->throttle_lag_max = 0;
 
-       result = pg_malloc(sizeof(TResult));
-
-       INSTR_TIME_SET_ZERO(result->conn_time);
+       INSTR_TIME_SET_ZERO(thread->conn_time);
 
        /* open log file if requested */
        if (use_log)
@@ -3555,8 +3536,8 @@ threadRun(void *arg)
        }
 
        /* time after thread and connections set up */
-       INSTR_TIME_SET_CURRENT(result->conn_time);
-       INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+       INSTR_TIME_SET_CURRENT(thread->conn_time);
+       INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
 
        agg_vals_init(&aggs, thread->start_time);
 
@@ -3568,7 +3549,7 @@ threadRun(void *arg)
                int                     prev_ecnt = st->ecnt;
 
                st->use_file = getrand(thread, 0, num_files - 1);
-               if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+               if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
                        remains--;                      /* I've aborted */
 
                if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3650,11 +3631,7 @@ threadRun(void *arg)
                }
 
                /* also wake up to print the next progress report on time */
-               if (progress && min_usec > 0
-#if !defined(PTHREAD_FORK_EMULATION)
-                       && thread->tid == 0
-#endif   /* !PTHREAD_FORK_EMULATION */
-                       )
+               if (progress && min_usec > 0)
                {
                        /* get current time if needed */
                        if (now_usec == 0)
@@ -3710,7 +3687,7 @@ threadRun(void *arg)
                        if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
                                                        || commands[st->state]->type == META_COMMAND))
                        {
-                               if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+                               if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
                                        remains--;      /* I've aborted */
                        }
 
@@ -3723,76 +3700,6 @@ threadRun(void *arg)
                        }
                }
 
-#ifdef PTHREAD_FORK_EMULATION
-               /* each process reports its own progression */
-               if (progress)
-               {
-                       instr_time      now_time;
-                       int64           now;
-
-                       INSTR_TIME_SET_CURRENT(now_time);
-                       now = INSTR_TIME_GET_MICROSEC(now_time);
-                       if (now >= next_report)
-                       {
-                               /* generate and show report */
-                               int64           count = 0,
-                                                       lats = 0,
-                                                       sqlats = 0,
-                                                       skipped = 0;
-                               int64           lags = thread->throttle_lag;
-                               int64           run = now - last_report;
-                               double          tps,
-                                                       total_run,
-                                                       latency,
-                                                       sqlat,
-                                                       stdev,
-                                                       lag;
-
-                               for (i = 0; i < nstate; i++)
-                               {
-                                       count += state[i].cnt;
-                                       lats += state[i].txn_latencies;
-                                       sqlats += state[i].txn_sqlats;
-                               }
-
-                               total_run = (now - thread_start) / 1000000.0;
-                               tps = 1000000.0 * (count - last_count) / run;
-                               latency = 0.001 * (lats - last_lats) / (count - last_count);
-                               sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
-                               stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
-                               lag = 0.001 * (lags - last_lags) / (count - last_count);
-                               skipped = thread->throttle_latency_skipped - last_skipped;
-
-                               fprintf(stderr,
-                                               "progress %d: %.1f s, %.1f tps, "
-                                               "lat %.3f ms stddev %.3f",
-                                               thread->tid, total_run, tps, latency, stdev);
-                               if (throttle_delay)
-                               {
-                                       fprintf(stderr, ", lag %.3f ms", lag);
-                                       if (latency_limit)
-                                               fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
-                               }
-                               fprintf(stderr, "\n");
-
-                               last_count = count;
-                               last_lats = lats;
-                               last_sqlats = sqlats;
-                               last_lags = lags;
-                               last_report = now;
-                               last_skipped = thread->throttle_latency_skipped;
-
-                               /*
-                                * Ensure that the next report is in the future, in case
-                                * pgbench/postgres got stuck somewhere.
-                                */
-                               do
-                               {
-                                       next_report += (int64) progress *1000000;
-                               } while (now >= next_report);
-                       }
-               }
-#else
                /* progress report by thread 0 for all threads */
                if (progress && thread->tid == 0)
                {
@@ -3817,6 +3724,17 @@ threadRun(void *arg)
                                                        lag,
                                                        stdev;
 
+                               /*
+                                * Add up the statistics of all threads.
+                                *
+                                * XXX: No locking. There is no guarantee that we get an
+                                * atomic snapshot of the transaction count and latencies, so
+                                * these figures can well be off by a small amount. The
+                                * progress is report's purpose is to give a quick overview of
+                                * how the test is going, so that shouldn't matter too much.
+                                * (If a read from a 64-bit integer is not atomic, you might
+                                * get a "torn" read and completely bogus latencies though!)
+                                */
                                for (i = 0; i < progress_nclients; i++)
                                {
                                        count += state[i].cnt;
@@ -3864,31 +3782,16 @@ threadRun(void *arg)
                                } while (now >= next_report);
                        }
                }
-#endif   /* PTHREAD_FORK_EMULATION */
        }
 
 done:
        INSTR_TIME_SET_CURRENT(start);
        disconnect_all(state, nstate);
-       result->xacts = 0;
-       result->latencies = 0;
-       result->sqlats = 0;
-       for (i = 0; i < nstate; i++)
-       {
-               result->xacts += state[i].cnt;
-               result->latencies += state[i].txn_latencies;
-               result->sqlats += state[i].txn_sqlats;
-       }
-       result->throttle_lag = thread->throttle_lag;
-       result->throttle_lag_max = thread->throttle_lag_max;
-       result->throttle_latency_skipped = thread->throttle_latency_skipped;
-       result->latency_late = thread->latency_late;
-
        INSTR_TIME_SET_CURRENT(end);
-       INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+       INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
        if (logfile)
                fclose(logfile);
-       return result;
+       return NULL;
 }
 
 /*
@@ -3910,90 +3813,6 @@ setalarm(int seconds)
        alarm(seconds);
 }
 
-#ifndef ENABLE_THREAD_SAFETY
-
-/*
- * implements pthread using fork.
- */
-
-typedef struct fork_pthread
-{
-       pid_t           pid;
-       int                     pipes[2];
-}      fork_pthread;
-
-static int
-pthread_create(pthread_t *thread,
-                          pthread_attr_t *attr,
-                          void *(*start_routine) (void *),
-                          void *arg)
-{
-       fork_pthread *th;
-       void       *ret;
-       int                     rc;
-
-       th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
-       if (pipe(th->pipes) < 0)
-       {
-               free(th);
-               return errno;
-       }
-
-       th->pid = fork();
-       if (th->pid == -1)                      /* error */
-       {
-               free(th);
-               return errno;
-       }
-       if (th->pid != 0)                       /* in parent process */
-       {
-               close(th->pipes[1]);
-               *thread = th;
-               return 0;
-       }
-
-       /* in child process */
-       close(th->pipes[0]);
-
-       /* set alarm again because the child does not inherit timers */
-       if (duration > 0)
-               setalarm(duration);
-
-       ret = start_routine(arg);
-       rc = write(th->pipes[1], ret, sizeof(TResult));
-       (void) rc;
-       close(th->pipes[1]);
-       free(th);
-       exit(0);
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
-       int                     status;
-
-       while (waitpid(th->pid, &status, 0) != th->pid)
-       {
-               if (errno != EINTR)
-                       return errno;
-       }
-
-       if (thread_return != NULL)
-       {
-               /* assume result is TResult */
-               *thread_return = pg_malloc(sizeof(TResult));
-               if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
-               {
-                       free(*thread_return);
-                       *thread_return = NULL;
-               }
-       }
-       close(th->pipes[0]);
-
-       free(th);
-       return 0;
-}
-#endif
 #else                                                  /* WIN32 */
 
 static VOID CALLBACK