]> granicus.if.org Git - libevent/commitdiff
Provide OpenSSL style support for multiple threads accessing the same event_base
authorNiels Provos <provos@gmail.com>
Sun, 2 Mar 2008 21:18:33 +0000 (21:18 +0000)
committerNiels Provos <provos@gmail.com>
Sun, 2 Mar 2008 21:18:33 +0000 (21:18 +0000)
svn:r684

14 files changed:
ChangeLog
Makefile.am
autogen.sh
configure.in
event-internal.h
event.c
evthread-internal.h [new file with mode: 0644]
http.c
include/Makefile.am
m4/acx_pthread.m4 [new file with mode: 0644]
test/Makefile.am
test/regress.c
test/regress.h
test/regress_pthread.c [new file with mode: 0644]

index 07caa5f24029fb4738f548d372846dae43271ec5..c172189f3173815aa64605cd32e5dc6a025db491 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -56,6 +56,7 @@ Changes in current version:
  o rewrite of the evbuffer code to reduce memory copies
  o Some older Solaris versions demand that _REENTRANT be defined to get strtok_r(); do so.
  o Do not free the kqop file descriptor in other processes, also allow it to be 0; from Andrei Nigmatulin
+ o Provide OpenSSL style support for multiple threads accessing the same event_base
 
 Changes in 1.4.0:
  o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr.
index 87144bf39a07d01a256c1150d5ba93be3a6d6650..5fab46a07f6f88d5ec613a2c58230d2a04243343 100644 (file)
@@ -29,7 +29,7 @@ VERSION_INFO = 2:0:0
 bin_SCRIPTS = event_rpcgen.py
 
 EXTRA_DIST = autogen.sh event.h event-internal.h log.h evsignal.h evdns.3 \
-       evrpc.h evrpc-internal.h min_heap.h \
+       evrpc.h evrpc-internal.h min_heap.h evthread-internal.h \
        event.3 \
        kqueue.c epoll_sub.c epoll.c select.c poll.c signal.c \
        evport.c devpoll.c event_rpcgen.py \
index 6d4275a6392ad865955939a9ee5f5e577070d1c0..da16b75774c82f0269523aa48016b03aa617532c 100755 (executable)
@@ -4,7 +4,7 @@ SYSNAME=`uname`
 if [ "x$SYSNAME" = "xDarwin" ] ; then
   LIBTOOLIZE=glibtoolize
 fi
-aclocal && \
+aclocal -I m4 && \
        autoheader && \
        $LIBTOOLIZE && \
        autoconf && \
index e6582a0356fd2c16d71712bf9dc16b44b2b7022e..59ccd4a677e16c281389d308d44190d4533274ac 100644 (file)
@@ -22,7 +22,9 @@ fi
 
 AC_ARG_ENABLE(gcc-warnings,
      AS_HELP_STRING(--enable-gcc-warnings, enable verbose warnings with GCC))
-
+AC_ARG_ENABLE(thread-support,
+     AS_HELP_STRING(--enable-thread-support, enable support for threading),
+       [], [enable_thread_support=yes])
 AC_PROG_LIBTOOL
 
 dnl   Uncomment "AC_DISABLE_SHARED" to make shared librraries not get
@@ -351,6 +353,20 @@ AC_TRY_COMPILE([],
          [Define to appropriate substitue if compiler doesnt have __func__])))
 
 
+# check if we can compile with pthreads for the unittests
+have_pthreads=no
+ACX_PTHREAD([
+       AC_DEFINE(HAVE_PTHREADS, 1,
+               [Define if we have pthreads on this system])
+       have_pthreads=yes])
+AM_CONDITIONAL(PTHREAD_REGRESS, [test "$have_pthreads" != "no"])
+
+# check if we should compile locking into the  library
+if test x$enable_thread_support = xno; then
+   AC_DEFINE(DISABLE_THREAD_SUPPORT, 1,
+       [Define if libevent should not be compiled with thread support])
+fi
+
 # Add some more warnings which we use in development but not in the
 # released versions.  (Some relevant gcc versions can't handle these.)
 if test x$enable_gcc_warnings = xyes; then
index f61757c118bb62d62bd072a409c055d0f5c9f88b..bc8cc7db413f588c80d45401404f3c25e843fc2c 100644 (file)
@@ -67,6 +67,13 @@ struct event_base {
        struct timeval event_tv;
 
        struct min_heap timeheap;
+
+       /* threading support */
+       unsigned long th_owner_id;
+       unsigned long (*th_get_id)(void);
+       void (*th_lock)(int mode, int locknum);
+       int th_notify_fd[2];
+       struct event th_notify;
 };
 
 /* Internal use only: Functions that might be missing from <sys/queue.h> */
diff --git a/event.c b/event.c
index e2dc1f9a4f7438e2639619fca1ff2b77eae3c13f..a31f3ec6582a71cd83870358cd071db0b0e674d1 100644 (file)
--- a/event.c
+++ b/event.c
 #include <sys/_time.h>
 #endif
 #include <sys/queue.h>
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
 #include <stdio.h>
 #include <stdlib.h>
-#ifndef WIN32
+#ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
 #include <errno.h>
@@ -54,6 +57,8 @@
 
 #include "event.h"
 #include "event-internal.h"
+#include "evthread-internal.h"
+#include "event2/thread.h"
 #include "evutil.h"
 #include "log.h"
 
@@ -115,6 +120,10 @@ int (*event_sigcb)(void);          /* Signal callback when gotsig is set */
 volatile sig_atomic_t event_gotsig;    /* Set in signal handler */
 
 /* Prototypes */
+static inline int event_add_internal(struct event *ev, struct timeval *tv);
+static inline int event_del_internal(struct event *ev);
+static inline void event_active_internal(struct event *ev, int res,short count);
+
 static void    event_queue_insert(struct event_base *, struct event *, int);
 static void    event_queue_remove(struct event_base *, struct event *, int);
 static int     event_haveevents(struct event_base *);
@@ -204,6 +213,10 @@ event_base_new(void)
        /* allocate a single active event queue */
        event_base_priority_init(base, 1);
 
+       /* prepare for threading */
+       base->th_notify_fd[0] = -1;
+       base->th_notify_fd[1] = -1;
+
        return (base);
 }
 
@@ -220,6 +233,14 @@ event_base_free(struct event_base *base)
 
        /* XXX(niels) - check for internal events first */
        assert(base);
+
+       /* threading fds if we have them */
+       if (base->th_notify_fd[0] != -1) {
+               event_del(&base->th_notify);
+               close(base->th_notify_fd[0]);
+               close(base->th_notify_fd[1]);
+       }
+
        /* Delete all non-internal events. */
        for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) {
                struct event *next = TAILQ_NEXT(ev, ev_next);
@@ -343,6 +364,8 @@ event_process_active(struct event_base *base)
        int i;
        short ncalls;
 
+       EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+
        for (i = 0; i < base->nactivequeues; ++i) {
                if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
                        activeq = base->activequeues[i];
@@ -356,8 +379,11 @@ event_process_active(struct event_base *base)
                if (ev->ev_events & EV_PERSIST)
                        event_queue_remove(base, ev, EVLIST_ACTIVE);
                else
-                       event_del(ev);
+                       event_del_internal(ev);
                
+               EVTHREAD_RELEASE_LOCK(base,
+                   EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+
                /* Allows deletes to work */
                ncalls = ev->ev_ncalls;
                ev->ev_pncalls = &ncalls;
@@ -368,7 +394,11 @@ event_process_active(struct event_base *base)
                        if (event_gotsig || base->event_break)
                                return;
                }
+               EVTHREAD_ACQUIRE_LOCK(base,
+                   EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
        }
+
+       EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
 }
 
 /*
@@ -613,7 +643,7 @@ event_set(struct event *ev, evutil_socket_t fd, short events,
        min_heap_elem_init(ev);
 
        /* by default, we put new events into the middle priority */
-       if(current_base)
+       if (current_base)
                ev->ev_pri = current_base->nactivequeues/2;
 }
 
@@ -683,10 +713,25 @@ event_pending(struct event *ev, short event, struct timeval *tv)
 
 int
 event_add(struct event *ev, struct timeval *tv)
+{
+       int res;
+
+       EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+       
+       res = event_add_internal(ev, tv);
+
+       EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+
+       return (res);
+}
+
+static inline int
+event_add_internal(struct event *ev, struct timeval *tv)
 {
        struct event_base *base = ev->ev_base;
        const struct eventop *evsel = base->evsel;
        void *evbase = base->evbase;
+       int res = 0;
 
        event_debug((
                 "event_add: event: %p, %s%s%scall %p",
@@ -705,8 +750,8 @@ event_add(struct event *ev, struct timeval *tv)
                        event_queue_remove(base, ev, EVLIST_TIMEOUT);
                else if (min_heap_reserve(&base->timeheap,
                        1 + min_heap_size(&base->timeheap)) == -1)
-                   return (-1);  /* ENOMEM == errno */
-
+                               return (-1);  /* ENOMEM == errno */
+                           
                /* Check if it is active due to a timeout.  Rescheduling
                 * this timeout before the callback can be executed
                 * removes it from the active list. */
@@ -735,29 +780,44 @@ event_add(struct event *ev, struct timeval *tv)
 
        if ((ev->ev_events & (EV_READ|EV_WRITE)) &&
            !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
-               int res = evsel->add(evbase, ev);
+               res = evsel->add(evbase, ev);
                if (res != -1)
                        event_queue_insert(base, ev, EVLIST_INSERTED);
-
-               return (res);
        } else if ((ev->ev_events & EV_SIGNAL) &&
            !(ev->ev_flags & EVLIST_SIGNAL)) {
-               int res = evsel->add(evbase, ev);
+               res = evsel->add(evbase, ev);
                if (res != -1)
                        event_queue_insert(base, ev, EVLIST_SIGNAL);
-
-               return (res);
        }
 
-       return (0);
+       /* if we are not in the right thread, we need to wake up the loop */
+       if (res != -1 && !EVTHREAD_IN_THREAD(base))
+               write(base->th_notify_fd[1], "", 1);
+
+       return (res);
 }
 
 int
 event_del(struct event *ev)
+{
+       int res;
+
+       EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+       
+       res = event_del_internal(ev);
+
+       EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+
+       return (res);
+}
+
+static inline int
+event_del_internal(struct event *ev)
 {
        struct event_base *base;
        const struct eventop *evsel;
        void *evbase;
+       int res = 0;
 
        event_debug(("event_del: %p, callback %p",
                 ev, ev->ev_callback));
@@ -786,28 +846,47 @@ event_del(struct event *ev)
 
        if (ev->ev_flags & EVLIST_INSERTED) {
                event_queue_remove(base, ev, EVLIST_INSERTED);
-               return (evsel->del(evbase, ev));
+               res = evsel->del(evbase, ev);
        } else if (ev->ev_flags & EVLIST_SIGNAL) {
                event_queue_remove(base, ev, EVLIST_SIGNAL);
-               return (evsel->del(evbase, ev));
+               res = evsel->del(evbase, ev);
        }
 
-       return (0);
+       /* if we are not in the right thread, we need to wake up the loop */
+       if (res != -1 && !EVTHREAD_IN_THREAD(base))
+               write(base->th_notify_fd[1], "", 1);
+
+       return (res);
 }
 
 void
 event_active(struct event *ev, int res, short ncalls)
 {
+       EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+
+       event_active_internal(ev, res, ncalls);
+       
+       EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+}
+
+
+static inline void
+event_active_internal(struct event *ev, int res, short ncalls)
+{
+       struct event_base *base;
+
        /* We get different kinds of events, add them together */
        if (ev->ev_flags & EVLIST_ACTIVE) {
                ev->ev_res |= res;
                return;
        }
 
+       base = ev->ev_base;
+
        ev->ev_res = res;
        ev->ev_ncalls = ncalls;
        ev->ev_pncalls = NULL;
-       event_queue_insert(ev->ev_base, ev, EVLIST_ACTIVE);
+       event_queue_insert(base, ev, EVLIST_ACTIVE);
 }
 
 static int
@@ -816,28 +895,36 @@ timeout_next(struct event_base *base, struct timeval **tv_p)
        struct timeval now;
        struct event *ev;
        struct timeval *tv = *tv_p;
+       int res = 0;
+
+       EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+       ev = min_heap_top(&base->timeheap);
 
-       if ((ev = min_heap_top(&base->timeheap)) == NULL) {
+       if (ev == NULL) {
                /* if no time-based events are active wait for I/O */
                *tv_p = NULL;
-               return (0);
+               goto out;
        }
 
-       if (gettime(&now) == -1)
-               return (-1);
+       if (gettime(&now) == -1) {
+               res = -1;
+               goto out;
+       }
 
        if (evutil_timercmp(&ev->ev_timeout, &now, <=)) {
                evutil_timerclear(tv);
-               return (0);
+               goto out;
        }
 
        evutil_timersub(&ev->ev_timeout, &now, tv);
 
        assert(tv->tv_sec >= 0);
        assert(tv->tv_usec >= 0);
-
        event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
-       return (0);
+
+out:
+       EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+       return (res);
 }
 
 /*
@@ -858,8 +945,11 @@ timeout_correct(struct event_base *base, struct timeval *tv)
 
        /* Check if time is running backwards */
        gettime(tv);
+       EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+
        if (evutil_timercmp(tv, &base->event_tv, >=)) {
                base->event_tv = *tv;
+               EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
                return;
        }
 
@@ -877,6 +967,7 @@ timeout_correct(struct event_base *base, struct timeval *tv)
                struct timeval *ev_tv = &(**pev).ev_timeout;
                evutil_timersub(ev_tv, &off, ev_tv);
        }
+       EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
 }
 
 void
@@ -885,8 +976,12 @@ timeout_process(struct event_base *base)
        struct timeval now;
        struct event *ev;
 
-       if (min_heap_empty(&base->timeheap))
+       EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
+       if (min_heap_empty(&base->timeheap)) {
+               EVTHREAD_RELEASE_LOCK(base,
+                   EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
                return;
+       }
 
        gettime(&now);
 
@@ -895,12 +990,13 @@ timeout_process(struct event_base *base)
                        break;
 
                /* delete this event from the I/O queues */
-               event_del(ev);
+               event_del_internal(ev);
 
                event_debug(("timeout_process: call %p",
                         ev->ev_callback));
-               event_active(ev, EV_TIMEOUT, 1);
+               event_active_internal(ev, EV_TIMEOUT, 1);
        }
+       EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK);
 }
 
 void
@@ -1060,3 +1156,66 @@ event_set_mem_functions(void *(*malloc_fn)(size_t sz),
        _event_realloc_fn = realloc_fn;
        _event_free_fn = free_fn;
 }
+
+/* support for threading */
+void
+evthread_set_locking_callback(struct event_base *base,
+    void (*locking_fn)(int mode, int locknum))
+{
+#ifdef DISABLE_THREAD_SUPPORT
+       event_errx(1, "%s: not compiled with thread support", __func__);
+#endif
+       base->th_lock = locking_fn;
+}
+
+static void
+evthread_ignore_fd(int fd, short what, void *arg)
+{
+       struct event_base *base = arg;
+       int buf[128];
+       
+       /* we draining the socket */
+       while (read(fd, buf, sizeof(buf)) != -1)
+               ;
+
+       event_add(&base->th_notify, NULL);
+}
+
+void
+evthread_set_id_callback(struct event_base *base,
+    unsigned long (*id_fn)(void))
+{
+#ifdef DISABLE_THREAD_SUPPORT
+       event_errx(1, "%s: not compiled with thread support", __func__);
+#endif
+       base->th_get_id = id_fn;
+       base->th_owner_id = (*id_fn)();
+       /* 
+        * If another thread wants to add a new event, we need to notify
+        * the thread that owns the base to wakeup for rescheduling.
+        */
+       if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0,
+               base->th_notify_fd) == -1)
+               event_err(1, "%s: socketpair", __func__);
+
+       evutil_make_socket_nonblocking(base->th_notify_fd[0]);
+       evutil_make_socket_nonblocking(base->th_notify_fd[1]);
+
+       /* prepare an event that we can use for wakeup */
+       event_set(&base->th_notify, base->th_notify_fd[0], EV_READ,
+           evthread_ignore_fd, base);
+       event_base_set(base, &base->th_notify);
+       /* we need to mark this as internal event */
+       base->th_notify.ev_flags |= EVLIST_INTERNAL;
+
+       event_add(&base->th_notify, NULL);
+}
+
+int
+evthread_num_locks(void)
+{
+#ifdef DISABLE_THREAD_SUPPORT
+       event_errx(1, "%s: not compiled with thread support", __func__);
+#endif
+       return (EVTHREAD_NUM_LOCKS);
+}
diff --git a/evthread-internal.h b/evthread-internal.h
new file mode 100644 (file)
index 0000000..6b8145c
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2008 Niels Provos <provos@citi.umich.edu>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _EVTHREAD_INTERNAL_H_
+#define _EVTHREAD_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "config.h"
+
+enum evthread_locks {
+       EVTHREAD_BASE_LOCK = 0,
+       EVTHREAD_NUM_LOCKS = 1
+};
+
+struct event_base;
+#ifndef DISABLE_THREAD_SUPPORT
+#define EVTHREAD_USE_LOCKS(base) \
+       ((base)->th_lock != NULL)
+
+#define EVTHREAD_IN_THREAD(base) \
+       ((base)->th_get_id == NULL || \
+       (base)->th_owner_id == (*(base)->th_get_id)())
+
+#define EVTHREAD_GET_ID(base) \
+       (*(base)->th_get_id)()
+
+#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock) do {   \
+               if (EVTHREAD_USE_LOCKS(base))           \
+                       (*(base)->th_lock)(EVTHREAD_LOCK | mode, lock); \
+       } while (0)
+
+#define EVTHREAD_RELEASE_LOCK(base, mode, lock) do {   \
+               if (EVTHREAD_USE_LOCKS(base))           \
+                       (*(base)->th_lock)(EVTHREAD_UNLOCK | mode, lock); \
+       } while (0)
+#else /* DISABLE_THREAD_SUPPORT */
+#define EVTHREAD_USE_LOCKS(base)
+#define EVTHREAD_IN_THREAD(base)       1
+#define EVTHREAD_GET_ID(base)
+#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock)
+#define EVTHREAD_RELEASE_LOCK(base, mode, lock)
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _EVTHREAD_INTERNAL_H_ */
diff --git a/http.c b/http.c
index 710c752250bd0a53fbedc33e8dbed3c8e7e68b7f..338e6cfd0080e0eda1b17446c6efbf8236a94182 100644 (file)
--- a/http.c
+++ b/http.c
@@ -951,7 +951,8 @@ static void
 evhttp_connection_stop_detectclose(struct evhttp_connection *evcon)
 {
        evcon->flags &= ~EVHTTP_CON_CLOSEDETECT;
-       event_del(&evcon->close_ev);
+       if (event_initialized(&evcon->close_ev))
+               event_del(&evcon->close_ev);
 }
 
 static void
index b1a1b7ce15bda6392ec416cb6421e7c5e101203c..dadb974ed0e6a022ab021bf72ac524fd675a5f17 100644 (file)
@@ -1,4 +1,4 @@
 AUTOMAKE_OPTIONS = foreign
 
-EXTRA_SRC = event2/buffer.h
-nobase_include_HEADERS = event2/buffer.h
+EXTRA_SRC = event2/buffer.h event2/thread.h
+nobase_include_HEADERS = event2/buffer.h event2/thread.h
diff --git a/m4/acx_pthread.m4 b/m4/acx_pthread.m4
new file mode 100644 (file)
index 0000000..d2b1169
--- /dev/null
@@ -0,0 +1,279 @@
+##### http://autoconf-archive.cryp.to/acx_pthread.html
+#
+# SYNOPSIS
+#
+#   ACX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]])
+#
+# DESCRIPTION
+#
+#   This macro figures out how to build C programs using POSIX threads.
+#   It sets the PTHREAD_LIBS output variable to the threads library and
+#   linker flags, and the PTHREAD_CFLAGS output variable to any special
+#   C compiler flags that are needed. (The user can also force certain
+#   compiler flags/libs to be tested by setting these environment
+#   variables.)
+#
+#   Also sets PTHREAD_CC to any special C compiler that is needed for
+#   multi-threaded programs (defaults to the value of CC otherwise).
+#   (This is necessary on AIX to use the special cc_r compiler alias.)
+#
+#   NOTE: You are assumed to not only compile your program with these
+#   flags, but also link it with them as well. e.g. you should link
+#   with $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS
+#   $LIBS
+#
+#   If you are only building threads programs, you may wish to use
+#   these variables in your default LIBS, CFLAGS, and CC:
+#
+#          LIBS="$PTHREAD_LIBS $LIBS"
+#          CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+#          CC="$PTHREAD_CC"
+#
+#   In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute
+#   constant has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to
+#   that name (e.g. PTHREAD_CREATE_UNDETACHED on AIX).
+#
+#   ACTION-IF-FOUND is a list of shell commands to run if a threads
+#   library is found, and ACTION-IF-NOT-FOUND is a list of commands to
+#   run it if it is not found. If ACTION-IF-FOUND is not specified, the
+#   default action will define HAVE_PTHREAD.
+#
+#   Please let the authors know if this macro fails on any platform, or
+#   if you have any other suggestions or comments. This macro was based
+#   on work by SGJ on autoconf scripts for FFTW (http://www.fftw.org/)
+#   (with help from M. Frigo), as well as ac_pthread and hb_pthread
+#   macros posted by Alejandro Forero Cuervo to the autoconf macro
+#   repository. We are also grateful for the helpful feedback of
+#   numerous users.
+#
+# LAST MODIFICATION
+#
+#   2007-07-29
+#
+# COPYLEFT
+#
+#   Copyright (c) 2007 Steven G. Johnson <stevenj@alum.mit.edu>
+#
+#   This program is free software: you can redistribute it and/or
+#   modify it under the terms of the GNU General Public License as
+#   published by the Free Software Foundation, either version 3 of the
+#   License, or (at your option) any later version.
+#
+#   This program is distributed in the hope that it will be useful, but
+#   WITHOUT ANY WARRANTY; without even the implied warranty of
+#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+#   General Public License for more details.
+#
+#   You should have received a copy of the GNU General Public License
+#   along with this program. If not, see
+#   <http://www.gnu.org/licenses/>.
+#
+#   As a special exception, the respective Autoconf Macro's copyright
+#   owner gives unlimited permission to copy, distribute and modify the
+#   configure scripts that are the output of Autoconf when processing
+#   the Macro. You need not follow the terms of the GNU General Public
+#   License when using or distributing such scripts, even though
+#   portions of the text of the Macro appear in them. The GNU General
+#   Public License (GPL) does govern all other use of the material that
+#   constitutes the Autoconf Macro.
+#
+#   This special exception to the GPL applies to versions of the
+#   Autoconf Macro released by the Autoconf Macro Archive. When you
+#   make and distribute a modified version of the Autoconf Macro, you
+#   may extend this special exception to the GPL to apply to your
+#   modified version as well.
+
+AC_DEFUN([ACX_PTHREAD], [
+AC_REQUIRE([AC_CANONICAL_HOST])
+AC_LANG_SAVE
+AC_LANG_C
+acx_pthread_ok=no
+
+# We used to check for pthread.h first, but this fails if pthread.h
+# requires special compiler flags (e.g. on True64 or Sequent).
+# It gets checked for in the link test anyway.
+
+# First of all, check if the user has set any of the PTHREAD_LIBS,
+# etcetera environment variables, and if threads linking works using
+# them:
+if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then
+        save_CFLAGS="$CFLAGS"
+        CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+        save_LIBS="$LIBS"
+        LIBS="$PTHREAD_LIBS $LIBS"
+        AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS])
+        AC_TRY_LINK_FUNC(pthread_join, acx_pthread_ok=yes)
+        AC_MSG_RESULT($acx_pthread_ok)
+        if test x"$acx_pthread_ok" = xno; then
+                PTHREAD_LIBS=""
+                PTHREAD_CFLAGS=""
+        fi
+        LIBS="$save_LIBS"
+        CFLAGS="$save_CFLAGS"
+fi
+
+# We must check for the threads library under a number of different
+# names; the ordering is very important because some systems
+# (e.g. DEC) have both -lpthread and -lpthreads, where one of the
+# libraries is broken (non-POSIX).
+
+# Create a list of thread flags to try.  Items starting with a "-" are
+# C compiler flags, and other items are library names, except for "none"
+# which indicates that we try without any flags at all, and "pthread-config"
+# which is a program returning the flags for the Pth emulation library.
+
+acx_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config"
+
+# The ordering *is* (sometimes) important.  Some notes on the
+# individual items follow:
+
+# pthreads: AIX (must check this before -lpthread)
+# none: in case threads are in libc; should be tried before -Kthread and
+#       other compiler flags to prevent continual compiler warnings
+# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h)
+# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able)
+# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread)
+# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads)
+# -pthreads: Solaris/gcc
+# -mthreads: Mingw32/gcc, Lynx/gcc
+# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it
+#      doesn't hurt to check since this sometimes defines pthreads too;
+#      also defines -D_REENTRANT)
+#      ... -mt is also the pthreads flag for HP/aCC
+# pthread: Linux, etcetera
+# --thread-safe: KAI C++
+# pthread-config: use pthread-config program (for GNU Pth library)
+
+case "${host_cpu}-${host_os}" in
+        *solaris*)
+
+        # On Solaris (at least, for some versions), libc contains stubbed
+        # (non-functional) versions of the pthreads routines, so link-based
+        # tests will erroneously succeed.  (We need to link with -pthreads/-mt/
+        # -lpthread.)  (The stubs are missing pthread_cleanup_push, or rather
+        # a function called by this macro, so we could check for that, but
+        # who knows whether they'll stub that too in a future libc.)  So,
+        # we'll just look for -pthreads and -lpthread first:
+
+        acx_pthread_flags="-pthreads pthread -mt -pthread $acx_pthread_flags"
+        ;;
+esac
+
+if test x"$acx_pthread_ok" = xno; then
+for flag in $acx_pthread_flags; do
+
+        case $flag in
+                none)
+                AC_MSG_CHECKING([whether pthreads work without any flags])
+                ;;
+
+                -*)
+                AC_MSG_CHECKING([whether pthreads work with $flag])
+                PTHREAD_CFLAGS="$flag"
+                ;;
+
+               pthread-config)
+               AC_CHECK_PROG(acx_pthread_config, pthread-config, yes, no)
+               if test x"$acx_pthread_config" = xno; then continue; fi
+               PTHREAD_CFLAGS="`pthread-config --cflags`"
+               PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`"
+               ;;
+
+                *)
+                AC_MSG_CHECKING([for the pthreads library -l$flag])
+                PTHREAD_LIBS="-l$flag"
+                ;;
+        esac
+
+        save_LIBS="$LIBS"
+        save_CFLAGS="$CFLAGS"
+        LIBS="$PTHREAD_LIBS $LIBS"
+        CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+        # Check for various functions.  We must include pthread.h,
+        # since some functions may be macros.  (On the Sequent, we
+        # need a special flag -Kthread to make this header compile.)
+        # We check for pthread_join because it is in -lpthread on IRIX
+        # while pthread_create is in libc.  We check for pthread_attr_init
+        # due to DEC craziness with -lpthreads.  We check for
+        # pthread_cleanup_push because it is one of the few pthread
+        # functions on Solaris that doesn't have a non-functional libc stub.
+        # We try pthread_create on general principles.
+        AC_TRY_LINK([#include <pthread.h>],
+                    [pthread_t th; pthread_join(th, 0);
+                     pthread_attr_init(0); pthread_cleanup_push(0, 0);
+                     pthread_create(0,0,0,0); pthread_cleanup_pop(0); ],
+                    [acx_pthread_ok=yes])
+
+        LIBS="$save_LIBS"
+        CFLAGS="$save_CFLAGS"
+
+        AC_MSG_RESULT($acx_pthread_ok)
+        if test "x$acx_pthread_ok" = xyes; then
+                break;
+        fi
+
+        PTHREAD_LIBS=""
+        PTHREAD_CFLAGS=""
+done
+fi
+
+# Various other checks:
+if test "x$acx_pthread_ok" = xyes; then
+        save_LIBS="$LIBS"
+        LIBS="$PTHREAD_LIBS $LIBS"
+        save_CFLAGS="$CFLAGS"
+        CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
+
+        # Detect AIX lossage: JOINABLE attribute is called UNDETACHED.
+       AC_MSG_CHECKING([for joinable pthread attribute])
+       attr_name=unknown
+       for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do
+           AC_TRY_LINK([#include <pthread.h>], [int attr=$attr; return attr;],
+                        [attr_name=$attr; break])
+       done
+        AC_MSG_RESULT($attr_name)
+        if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then
+            AC_DEFINE_UNQUOTED(PTHREAD_CREATE_JOINABLE, $attr_name,
+                               [Define to necessary symbol if this constant
+                                uses a non-standard name on your system.])
+        fi
+
+        AC_MSG_CHECKING([if more special flags are required for pthreads])
+        flag=no
+        case "${host_cpu}-${host_os}" in
+            *-aix* | *-freebsd* | *-darwin*) flag="-D_THREAD_SAFE";;
+            *solaris* | *-osf* | *-hpux*) flag="-D_REENTRANT";;
+        esac
+        AC_MSG_RESULT(${flag})
+        if test "x$flag" != xno; then
+            PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS"
+        fi
+
+        LIBS="$save_LIBS"
+        CFLAGS="$save_CFLAGS"
+
+        # More AIX lossage: must compile with xlc_r or cc_r
+       if test x"$GCC" != xyes; then
+          AC_CHECK_PROGS(PTHREAD_CC, xlc_r cc_r, ${CC})
+        else
+          PTHREAD_CC=$CC
+       fi
+else
+        PTHREAD_CC="$CC"
+fi
+
+AC_SUBST(PTHREAD_LIBS)
+AC_SUBST(PTHREAD_CFLAGS)
+AC_SUBST(PTHREAD_CC)
+
+# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND:
+if test x"$acx_pthread_ok" = xyes; then
+        ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1])
+        :
+else
+        acx_pthread_ok=no
+        $2
+fi
+AC_LANG_RESTORE
+])dnl ACX_PTHREAD
index 9fa76708a199a11159884ab46e6180a8b9c97857..0e254a80ed22cb8fd707ab87b0c6f3eec2b096ef 100644 (file)
@@ -17,9 +17,13 @@ test_weof_LDADD = ../libevent_core.la
 test_time_SOURCES = test-time.c
 test_time_LDADD = ../libevent_core.la
 regress_SOURCES = regress.c regress.h regress_http.c regress_dns.c \
-       regress_rpc.c \
-       regress.gen.c regress.gen.h
-regress_LDADD = ../libevent.la
+       regress_rpc.c regress.gen.c regress.gen.h
+if PTHREAD_REGRESS
+regress_SOURCES += regress_pthread.c
+endif
+regress_LDADD = ../libevent.la $(PTHREAD_LIBS)
+regress_CFLAGS = -I$(top_srcdir) -I$(top_srcdir)/compat \
+       -I$(top_srcdir)/include  $(PTHREAD_CFLAGS)
 bench_SOURCES = bench.c
 bench_LDADD = ../libevent.la
 bench_cascade_SOURCES = bench_cascade.c
index 9f4f76b64900b6dfacf495b05b56bc8c4d0da59e..48d3a712e606fd2d42238d8e52e0b76eb5180a4b 100644 (file)
@@ -1727,12 +1727,16 @@ main (int argc, char **argv)
 
        test_event_base_new();
 
+#if defined(HAVE_PTHREADS) && !defined(DISABLE_THREAD_SUPPORT)
+       regress_pthread();
+#endif
+       
        http_suite();
 
        rpc_suite();
 
        dns_suite();
-       
+
 #ifndef WIN32
        test_fork();
 #endif
index 4060ff5c6ac855c8bbd8c42712aa9ed1a4c5dff1..c5a4510af7194f563b9006a0a2bd312e849845a4 100644 (file)
@@ -37,6 +37,8 @@ void http_basic_test(void);
 void rpc_suite(void);
 
 void dns_suite(void);
+
+void regress_pthread(void);
        
 #ifdef __cplusplus
 }
diff --git a/test/regress_pthread.c b/test/regress_pthread.c
new file mode 100644 (file)
index 0000000..52d7e10
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2008 Niels Provos <provos@citi.umich.edu>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <sys/types.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <pthread.h>
+#include <assert.h>
+
+#include "evutil.h"
+#include "event.h"
+#include "event2/thread.h"
+
+static pthread_mutex_t *all_locks;
+
+struct cond_wait {
+       pthread_mutex_t lock;
+       pthread_cond_t cond;
+};
+
+static void
+basic_timeout(int fd, short what, void *arg)
+{
+       struct cond_wait *cw = arg;
+       assert(pthread_mutex_lock(&cw->lock) == 0);
+       assert(pthread_cond_broadcast(&cw->cond) == 0);
+       assert(pthread_mutex_unlock(&cw->lock) == 0);
+}
+
+#define NUM_THREADS    100
+static pthread_mutex_t count_lock;
+static int count;
+
+static void *
+basic_thread(void *arg)
+{
+       struct cond_wait cw;
+       struct event_base *base = arg;
+       struct event ev;
+       int i = 0;
+
+       assert(pthread_mutex_init(&cw.lock, NULL) == 0);
+       assert(pthread_cond_init(&cw.cond, NULL) == 0);
+
+       evtimer_set(&ev, basic_timeout, &cw);
+       event_base_set(base, &ev);
+       for (i = 0; i < 100; i++) {
+               struct timeval tv;
+               evutil_timerclear(&tv);
+               assert(evtimer_add(&ev, &tv) == 0);
+
+               assert(pthread_mutex_lock(&cw.lock) == 0);
+               assert(pthread_cond_wait(&cw.cond, &cw.lock) == 0);
+               assert(pthread_mutex_unlock(&cw.lock) == 0);
+
+               assert(pthread_mutex_lock(&count_lock) == 0);
+               ++count;
+               assert(pthread_mutex_unlock(&count_lock) == 0);
+       }
+
+       /* exit the loop only if all threads fired all timeouts */
+       assert(pthread_mutex_lock(&count_lock) == 0);
+       if (count >= NUM_THREADS * 100)
+               event_base_loopexit(base, NULL);
+       assert(pthread_mutex_unlock(&count_lock) == 0);
+
+       assert(pthread_cond_destroy(&cw.cond) == 0);
+       assert(pthread_mutex_destroy(&cw.lock) == 0);
+
+       return (NULL);
+}
+
+static void
+pthread_basic(struct event_base *base)
+{
+       pthread_t threads[NUM_THREADS];
+       struct event ev;
+       struct timeval tv;
+       int i;
+
+       fprintf(stdout, "Testing basic pthreads support: ");
+
+       for (i = 0; i < NUM_THREADS; ++i)
+               pthread_create(&threads[i], NULL, basic_thread, base);
+
+       evtimer_set(&ev, NULL, NULL);
+       event_base_set(base, &ev);
+       evutil_timerclear(&tv);
+       tv.tv_sec = 1000;
+       event_add(&ev, &tv);
+
+       event_base_dispatch(base);
+
+       for (i = 0; i < NUM_THREADS; ++i)
+               pthread_join(threads[i], NULL);
+
+       event_del(&ev);
+
+       fprintf(stdout, "OK\n");
+}
+
+static void
+locking(int mode, int locknum)
+{
+       pthread_mutex_t *lock = &all_locks[locknum];
+
+       if (mode & EVTHREAD_LOCK)
+               pthread_mutex_lock(lock);
+       else
+               pthread_mutex_unlock(lock);
+}
+
+static unsigned long
+get_id(void)
+{
+       return (unsigned long)(pthread_self());
+}
+
+void
+regress_pthread()
+{
+       struct event_base *base = event_base_new();
+       int i;
+
+       pthread_mutex_init(&count_lock, NULL);
+
+       all_locks = malloc(evthread_num_locks() * sizeof(pthread_mutex_t));
+       for (i = 0; i < evthread_num_locks(); ++i)
+               pthread_mutex_init(&all_locks[i], NULL);
+
+       evthread_set_locking_callback(base, locking);
+       evthread_set_id_callback(base, get_id);
+
+       pthread_basic(base);
+
+       for (i = 0; i < evthread_num_locks(); ++i)
+               pthread_mutex_destroy(&all_locks[i]);
+
+       pthread_mutex_destroy(&count_lock);
+
+       event_base_free(base);
+}