From: Niels Provos Date: Sun, 2 Mar 2008 21:18:33 +0000 (+0000) Subject: Provide OpenSSL style support for multiple threads accessing the same event_base X-Git-Tag: release-2.0.1-alpha~399 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=558de9b3776bf378fe2416c1c86639bc7abef3db;p=libevent Provide OpenSSL style support for multiple threads accessing the same event_base svn:r684 --- diff --git a/ChangeLog b/ChangeLog index 07caa5f2..c172189f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -56,6 +56,7 @@ Changes in current version: o rewrite of the evbuffer code to reduce memory copies o Some older Solaris versions demand that _REENTRANT be defined to get strtok_r(); do so. o Do not free the kqop file descriptor in other processes, also allow it to be 0; from Andrei Nigmatulin + o Provide OpenSSL style support for multiple threads accessing the same event_base Changes in 1.4.0: o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr. diff --git a/Makefile.am b/Makefile.am index 87144bf3..5fab46a0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -29,7 +29,7 @@ VERSION_INFO = 2:0:0 bin_SCRIPTS = event_rpcgen.py EXTRA_DIST = autogen.sh event.h event-internal.h log.h evsignal.h evdns.3 \ - evrpc.h evrpc-internal.h min_heap.h \ + evrpc.h evrpc-internal.h min_heap.h evthread-internal.h \ event.3 \ kqueue.c epoll_sub.c epoll.c select.c poll.c signal.c \ evport.c devpoll.c event_rpcgen.py \ diff --git a/autogen.sh b/autogen.sh index 6d4275a6..da16b757 100755 --- a/autogen.sh +++ b/autogen.sh @@ -4,7 +4,7 @@ SYSNAME=`uname` if [ "x$SYSNAME" = "xDarwin" ] ; then LIBTOOLIZE=glibtoolize fi -aclocal && \ +aclocal -I m4 && \ autoheader && \ $LIBTOOLIZE && \ autoconf && \ diff --git a/configure.in b/configure.in index e6582a03..59ccd4a6 100644 --- a/configure.in +++ b/configure.in @@ -22,7 +22,9 @@ fi AC_ARG_ENABLE(gcc-warnings, AS_HELP_STRING(--enable-gcc-warnings, enable verbose warnings with GCC)) - +AC_ARG_ENABLE(thread-support, + AS_HELP_STRING(--enable-thread-support, enable support for threading), + [], [enable_thread_support=yes]) AC_PROG_LIBTOOL dnl Uncomment "AC_DISABLE_SHARED" to make shared librraries not get @@ -351,6 +353,20 @@ AC_TRY_COMPILE([], [Define to appropriate substitue if compiler doesnt have __func__]))) +# check if we can compile with pthreads for the unittests +have_pthreads=no +ACX_PTHREAD([ + AC_DEFINE(HAVE_PTHREADS, 1, + [Define if we have pthreads on this system]) + have_pthreads=yes]) +AM_CONDITIONAL(PTHREAD_REGRESS, [test "$have_pthreads" != "no"]) + +# check if we should compile locking into the library +if test x$enable_thread_support = xno; then + AC_DEFINE(DISABLE_THREAD_SUPPORT, 1, + [Define if libevent should not be compiled with thread support]) +fi + # Add some more warnings which we use in development but not in the # released versions. (Some relevant gcc versions can't handle these.) if test x$enable_gcc_warnings = xyes; then diff --git a/event-internal.h b/event-internal.h index f61757c1..bc8cc7db 100644 --- a/event-internal.h +++ b/event-internal.h @@ -67,6 +67,13 @@ struct event_base { struct timeval event_tv; struct min_heap timeheap; + + /* threading support */ + unsigned long th_owner_id; + unsigned long (*th_get_id)(void); + void (*th_lock)(int mode, int locknum); + int th_notify_fd[2]; + struct event th_notify; }; /* Internal use only: Functions that might be missing from */ diff --git a/event.c b/event.c index e2dc1f9a..a31f3ec6 100644 --- a/event.c +++ b/event.c @@ -41,9 +41,12 @@ #include #endif #include +#ifdef HAVE_SYS_SOCKET_H +#include +#endif #include #include -#ifndef WIN32 +#ifdef HAVE_UNISTD_H #include #endif #include @@ -54,6 +57,8 @@ #include "event.h" #include "event-internal.h" +#include "evthread-internal.h" +#include "event2/thread.h" #include "evutil.h" #include "log.h" @@ -115,6 +120,10 @@ int (*event_sigcb)(void); /* Signal callback when gotsig is set */ volatile sig_atomic_t event_gotsig; /* Set in signal handler */ /* Prototypes */ +static inline int event_add_internal(struct event *ev, struct timeval *tv); +static inline int event_del_internal(struct event *ev); +static inline void event_active_internal(struct event *ev, int res,short count); + static void event_queue_insert(struct event_base *, struct event *, int); static void event_queue_remove(struct event_base *, struct event *, int); static int event_haveevents(struct event_base *); @@ -204,6 +213,10 @@ event_base_new(void) /* allocate a single active event queue */ event_base_priority_init(base, 1); + /* prepare for threading */ + base->th_notify_fd[0] = -1; + base->th_notify_fd[1] = -1; + return (base); } @@ -220,6 +233,14 @@ event_base_free(struct event_base *base) /* XXX(niels) - check for internal events first */ assert(base); + + /* threading fds if we have them */ + if (base->th_notify_fd[0] != -1) { + event_del(&base->th_notify); + close(base->th_notify_fd[0]); + close(base->th_notify_fd[1]); + } + /* Delete all non-internal events. */ for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) { struct event *next = TAILQ_NEXT(ev, ev_next); @@ -343,6 +364,8 @@ event_process_active(struct event_base *base) int i; short ncalls; + EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + for (i = 0; i < base->nactivequeues; ++i) { if (TAILQ_FIRST(base->activequeues[i]) != NULL) { activeq = base->activequeues[i]; @@ -356,8 +379,11 @@ event_process_active(struct event_base *base) if (ev->ev_events & EV_PERSIST) event_queue_remove(base, ev, EVLIST_ACTIVE); else - event_del(ev); + event_del_internal(ev); + EVTHREAD_RELEASE_LOCK(base, + EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + /* Allows deletes to work */ ncalls = ev->ev_ncalls; ev->ev_pncalls = &ncalls; @@ -368,7 +394,11 @@ event_process_active(struct event_base *base) if (event_gotsig || base->event_break) return; } + EVTHREAD_ACQUIRE_LOCK(base, + EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); } + + EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); } /* @@ -613,7 +643,7 @@ event_set(struct event *ev, evutil_socket_t fd, short events, min_heap_elem_init(ev); /* by default, we put new events into the middle priority */ - if(current_base) + if (current_base) ev->ev_pri = current_base->nactivequeues/2; } @@ -683,10 +713,25 @@ event_pending(struct event *ev, short event, struct timeval *tv) int event_add(struct event *ev, struct timeval *tv) +{ + int res; + + EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + + res = event_add_internal(ev, tv); + + EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + + return (res); +} + +static inline int +event_add_internal(struct event *ev, struct timeval *tv) { struct event_base *base = ev->ev_base; const struct eventop *evsel = base->evsel; void *evbase = base->evbase; + int res = 0; event_debug(( "event_add: event: %p, %s%s%scall %p", @@ -705,8 +750,8 @@ event_add(struct event *ev, struct timeval *tv) event_queue_remove(base, ev, EVLIST_TIMEOUT); else if (min_heap_reserve(&base->timeheap, 1 + min_heap_size(&base->timeheap)) == -1) - return (-1); /* ENOMEM == errno */ - + return (-1); /* ENOMEM == errno */ + /* Check if it is active due to a timeout. Rescheduling * this timeout before the callback can be executed * removes it from the active list. */ @@ -735,29 +780,44 @@ event_add(struct event *ev, struct timeval *tv) if ((ev->ev_events & (EV_READ|EV_WRITE)) && !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { - int res = evsel->add(evbase, ev); + res = evsel->add(evbase, ev); if (res != -1) event_queue_insert(base, ev, EVLIST_INSERTED); - - return (res); } else if ((ev->ev_events & EV_SIGNAL) && !(ev->ev_flags & EVLIST_SIGNAL)) { - int res = evsel->add(evbase, ev); + res = evsel->add(evbase, ev); if (res != -1) event_queue_insert(base, ev, EVLIST_SIGNAL); - - return (res); } - return (0); + /* if we are not in the right thread, we need to wake up the loop */ + if (res != -1 && !EVTHREAD_IN_THREAD(base)) + write(base->th_notify_fd[1], "", 1); + + return (res); } int event_del(struct event *ev) +{ + int res; + + EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + + res = event_del_internal(ev); + + EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + + return (res); +} + +static inline int +event_del_internal(struct event *ev) { struct event_base *base; const struct eventop *evsel; void *evbase; + int res = 0; event_debug(("event_del: %p, callback %p", ev, ev->ev_callback)); @@ -786,28 +846,47 @@ event_del(struct event *ev) if (ev->ev_flags & EVLIST_INSERTED) { event_queue_remove(base, ev, EVLIST_INSERTED); - return (evsel->del(evbase, ev)); + res = evsel->del(evbase, ev); } else if (ev->ev_flags & EVLIST_SIGNAL) { event_queue_remove(base, ev, EVLIST_SIGNAL); - return (evsel->del(evbase, ev)); + res = evsel->del(evbase, ev); } - return (0); + /* if we are not in the right thread, we need to wake up the loop */ + if (res != -1 && !EVTHREAD_IN_THREAD(base)) + write(base->th_notify_fd[1], "", 1); + + return (res); } void event_active(struct event *ev, int res, short ncalls) { + EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + + event_active_internal(ev, res, ncalls); + + EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); +} + + +static inline void +event_active_internal(struct event *ev, int res, short ncalls) +{ + struct event_base *base; + /* We get different kinds of events, add them together */ if (ev->ev_flags & EVLIST_ACTIVE) { ev->ev_res |= res; return; } + base = ev->ev_base; + ev->ev_res = res; ev->ev_ncalls = ncalls; ev->ev_pncalls = NULL; - event_queue_insert(ev->ev_base, ev, EVLIST_ACTIVE); + event_queue_insert(base, ev, EVLIST_ACTIVE); } static int @@ -816,28 +895,36 @@ timeout_next(struct event_base *base, struct timeval **tv_p) struct timeval now; struct event *ev; struct timeval *tv = *tv_p; + int res = 0; + + EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + ev = min_heap_top(&base->timeheap); - if ((ev = min_heap_top(&base->timeheap)) == NULL) { + if (ev == NULL) { /* if no time-based events are active wait for I/O */ *tv_p = NULL; - return (0); + goto out; } - if (gettime(&now) == -1) - return (-1); + if (gettime(&now) == -1) { + res = -1; + goto out; + } if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { evutil_timerclear(tv); - return (0); + goto out; } evutil_timersub(&ev->ev_timeout, &now, tv); assert(tv->tv_sec >= 0); assert(tv->tv_usec >= 0); - event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec)); - return (0); + +out: + EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + return (res); } /* @@ -858,8 +945,11 @@ timeout_correct(struct event_base *base, struct timeval *tv) /* Check if time is running backwards */ gettime(tv); + EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + if (evutil_timercmp(tv, &base->event_tv, >=)) { base->event_tv = *tv; + EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); return; } @@ -877,6 +967,7 @@ timeout_correct(struct event_base *base, struct timeval *tv) struct timeval *ev_tv = &(**pev).ev_timeout; evutil_timersub(ev_tv, &off, ev_tv); } + EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); } void @@ -885,8 +976,12 @@ timeout_process(struct event_base *base) struct timeval now; struct event *ev; - if (min_heap_empty(&base->timeheap)) + EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); + if (min_heap_empty(&base->timeheap)) { + EVTHREAD_RELEASE_LOCK(base, + EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); return; + } gettime(&now); @@ -895,12 +990,13 @@ timeout_process(struct event_base *base) break; /* delete this event from the I/O queues */ - event_del(ev); + event_del_internal(ev); event_debug(("timeout_process: call %p", ev->ev_callback)); - event_active(ev, EV_TIMEOUT, 1); + event_active_internal(ev, EV_TIMEOUT, 1); } + EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, EVTHREAD_BASE_LOCK); } void @@ -1060,3 +1156,66 @@ event_set_mem_functions(void *(*malloc_fn)(size_t sz), _event_realloc_fn = realloc_fn; _event_free_fn = free_fn; } + +/* support for threading */ +void +evthread_set_locking_callback(struct event_base *base, + void (*locking_fn)(int mode, int locknum)) +{ +#ifdef DISABLE_THREAD_SUPPORT + event_errx(1, "%s: not compiled with thread support", __func__); +#endif + base->th_lock = locking_fn; +} + +static void +evthread_ignore_fd(int fd, short what, void *arg) +{ + struct event_base *base = arg; + int buf[128]; + + /* we draining the socket */ + while (read(fd, buf, sizeof(buf)) != -1) + ; + + event_add(&base->th_notify, NULL); +} + +void +evthread_set_id_callback(struct event_base *base, + unsigned long (*id_fn)(void)) +{ +#ifdef DISABLE_THREAD_SUPPORT + event_errx(1, "%s: not compiled with thread support", __func__); +#endif + base->th_get_id = id_fn; + base->th_owner_id = (*id_fn)(); + /* + * If another thread wants to add a new event, we need to notify + * the thread that owns the base to wakeup for rescheduling. + */ + if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, + base->th_notify_fd) == -1) + event_err(1, "%s: socketpair", __func__); + + evutil_make_socket_nonblocking(base->th_notify_fd[0]); + evutil_make_socket_nonblocking(base->th_notify_fd[1]); + + /* prepare an event that we can use for wakeup */ + event_set(&base->th_notify, base->th_notify_fd[0], EV_READ, + evthread_ignore_fd, base); + event_base_set(base, &base->th_notify); + /* we need to mark this as internal event */ + base->th_notify.ev_flags |= EVLIST_INTERNAL; + + event_add(&base->th_notify, NULL); +} + +int +evthread_num_locks(void) +{ +#ifdef DISABLE_THREAD_SUPPORT + event_errx(1, "%s: not compiled with thread support", __func__); +#endif + return (EVTHREAD_NUM_LOCKS); +} diff --git a/evthread-internal.h b/evthread-internal.h new file mode 100644 index 00000000..6b8145c6 --- /dev/null +++ b/evthread-internal.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2008 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +#ifndef _EVTHREAD_INTERNAL_H_ +#define _EVTHREAD_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "config.h" + +enum evthread_locks { + EVTHREAD_BASE_LOCK = 0, + EVTHREAD_NUM_LOCKS = 1 +}; + +struct event_base; +#ifndef DISABLE_THREAD_SUPPORT +#define EVTHREAD_USE_LOCKS(base) \ + ((base)->th_lock != NULL) + +#define EVTHREAD_IN_THREAD(base) \ + ((base)->th_get_id == NULL || \ + (base)->th_owner_id == (*(base)->th_get_id)()) + +#define EVTHREAD_GET_ID(base) \ + (*(base)->th_get_id)() + +#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock) do { \ + if (EVTHREAD_USE_LOCKS(base)) \ + (*(base)->th_lock)(EVTHREAD_LOCK | mode, lock); \ + } while (0) + +#define EVTHREAD_RELEASE_LOCK(base, mode, lock) do { \ + if (EVTHREAD_USE_LOCKS(base)) \ + (*(base)->th_lock)(EVTHREAD_UNLOCK | mode, lock); \ + } while (0) +#else /* DISABLE_THREAD_SUPPORT */ +#define EVTHREAD_USE_LOCKS(base) +#define EVTHREAD_IN_THREAD(base) 1 +#define EVTHREAD_GET_ID(base) +#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock) +#define EVTHREAD_RELEASE_LOCK(base, mode, lock) +#endif + +#ifdef __cplusplus +} +#endif + +#endif /* _EVTHREAD_INTERNAL_H_ */ diff --git a/http.c b/http.c index 710c7522..338e6cfd 100644 --- a/http.c +++ b/http.c @@ -951,7 +951,8 @@ static void evhttp_connection_stop_detectclose(struct evhttp_connection *evcon) { evcon->flags &= ~EVHTTP_CON_CLOSEDETECT; - event_del(&evcon->close_ev); + if (event_initialized(&evcon->close_ev)) + event_del(&evcon->close_ev); } static void diff --git a/include/Makefile.am b/include/Makefile.am index b1a1b7ce..dadb974e 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -1,4 +1,4 @@ AUTOMAKE_OPTIONS = foreign -EXTRA_SRC = event2/buffer.h -nobase_include_HEADERS = event2/buffer.h +EXTRA_SRC = event2/buffer.h event2/thread.h +nobase_include_HEADERS = event2/buffer.h event2/thread.h diff --git a/m4/acx_pthread.m4 b/m4/acx_pthread.m4 new file mode 100644 index 00000000..d2b11694 --- /dev/null +++ b/m4/acx_pthread.m4 @@ -0,0 +1,279 @@ +##### http://autoconf-archive.cryp.to/acx_pthread.html +# +# SYNOPSIS +# +# ACX_PTHREAD([ACTION-IF-FOUND[, ACTION-IF-NOT-FOUND]]) +# +# DESCRIPTION +# +# This macro figures out how to build C programs using POSIX threads. +# It sets the PTHREAD_LIBS output variable to the threads library and +# linker flags, and the PTHREAD_CFLAGS output variable to any special +# C compiler flags that are needed. (The user can also force certain +# compiler flags/libs to be tested by setting these environment +# variables.) +# +# Also sets PTHREAD_CC to any special C compiler that is needed for +# multi-threaded programs (defaults to the value of CC otherwise). +# (This is necessary on AIX to use the special cc_r compiler alias.) +# +# NOTE: You are assumed to not only compile your program with these +# flags, but also link it with them as well. e.g. you should link +# with $PTHREAD_CC $CFLAGS $PTHREAD_CFLAGS $LDFLAGS ... $PTHREAD_LIBS +# $LIBS +# +# If you are only building threads programs, you may wish to use +# these variables in your default LIBS, CFLAGS, and CC: +# +# LIBS="$PTHREAD_LIBS $LIBS" +# CFLAGS="$CFLAGS $PTHREAD_CFLAGS" +# CC="$PTHREAD_CC" +# +# In addition, if the PTHREAD_CREATE_JOINABLE thread-attribute +# constant has a nonstandard name, defines PTHREAD_CREATE_JOINABLE to +# that name (e.g. PTHREAD_CREATE_UNDETACHED on AIX). +# +# ACTION-IF-FOUND is a list of shell commands to run if a threads +# library is found, and ACTION-IF-NOT-FOUND is a list of commands to +# run it if it is not found. If ACTION-IF-FOUND is not specified, the +# default action will define HAVE_PTHREAD. +# +# Please let the authors know if this macro fails on any platform, or +# if you have any other suggestions or comments. This macro was based +# on work by SGJ on autoconf scripts for FFTW (http://www.fftw.org/) +# (with help from M. Frigo), as well as ac_pthread and hb_pthread +# macros posted by Alejandro Forero Cuervo to the autoconf macro +# repository. We are also grateful for the helpful feedback of +# numerous users. +# +# LAST MODIFICATION +# +# 2007-07-29 +# +# COPYLEFT +# +# Copyright (c) 2007 Steven G. Johnson +# +# This program is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see +# . +# +# As a special exception, the respective Autoconf Macro's copyright +# owner gives unlimited permission to copy, distribute and modify the +# configure scripts that are the output of Autoconf when processing +# the Macro. You need not follow the terms of the GNU General Public +# License when using or distributing such scripts, even though +# portions of the text of the Macro appear in them. The GNU General +# Public License (GPL) does govern all other use of the material that +# constitutes the Autoconf Macro. +# +# This special exception to the GPL applies to versions of the +# Autoconf Macro released by the Autoconf Macro Archive. When you +# make and distribute a modified version of the Autoconf Macro, you +# may extend this special exception to the GPL to apply to your +# modified version as well. + +AC_DEFUN([ACX_PTHREAD], [ +AC_REQUIRE([AC_CANONICAL_HOST]) +AC_LANG_SAVE +AC_LANG_C +acx_pthread_ok=no + +# We used to check for pthread.h first, but this fails if pthread.h +# requires special compiler flags (e.g. on True64 or Sequent). +# It gets checked for in the link test anyway. + +# First of all, check if the user has set any of the PTHREAD_LIBS, +# etcetera environment variables, and if threads linking works using +# them: +if test x"$PTHREAD_LIBS$PTHREAD_CFLAGS" != x; then + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + AC_MSG_CHECKING([for pthread_join in LIBS=$PTHREAD_LIBS with CFLAGS=$PTHREAD_CFLAGS]) + AC_TRY_LINK_FUNC(pthread_join, acx_pthread_ok=yes) + AC_MSG_RESULT($acx_pthread_ok) + if test x"$acx_pthread_ok" = xno; then + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" + fi + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" +fi + +# We must check for the threads library under a number of different +# names; the ordering is very important because some systems +# (e.g. DEC) have both -lpthread and -lpthreads, where one of the +# libraries is broken (non-POSIX). + +# Create a list of thread flags to try. Items starting with a "-" are +# C compiler flags, and other items are library names, except for "none" +# which indicates that we try without any flags at all, and "pthread-config" +# which is a program returning the flags for the Pth emulation library. + +acx_pthread_flags="pthreads none -Kthread -kthread lthread -pthread -pthreads -mthreads pthread --thread-safe -mt pthread-config" + +# The ordering *is* (sometimes) important. Some notes on the +# individual items follow: + +# pthreads: AIX (must check this before -lpthread) +# none: in case threads are in libc; should be tried before -Kthread and +# other compiler flags to prevent continual compiler warnings +# -Kthread: Sequent (threads in libc, but -Kthread needed for pthread.h) +# -kthread: FreeBSD kernel threads (preferred to -pthread since SMP-able) +# lthread: LinuxThreads port on FreeBSD (also preferred to -pthread) +# -pthread: Linux/gcc (kernel threads), BSD/gcc (userland threads) +# -pthreads: Solaris/gcc +# -mthreads: Mingw32/gcc, Lynx/gcc +# -mt: Sun Workshop C (may only link SunOS threads [-lthread], but it +# doesn't hurt to check since this sometimes defines pthreads too; +# also defines -D_REENTRANT) +# ... -mt is also the pthreads flag for HP/aCC +# pthread: Linux, etcetera +# --thread-safe: KAI C++ +# pthread-config: use pthread-config program (for GNU Pth library) + +case "${host_cpu}-${host_os}" in + *solaris*) + + # On Solaris (at least, for some versions), libc contains stubbed + # (non-functional) versions of the pthreads routines, so link-based + # tests will erroneously succeed. (We need to link with -pthreads/-mt/ + # -lpthread.) (The stubs are missing pthread_cleanup_push, or rather + # a function called by this macro, so we could check for that, but + # who knows whether they'll stub that too in a future libc.) So, + # we'll just look for -pthreads and -lpthread first: + + acx_pthread_flags="-pthreads pthread -mt -pthread $acx_pthread_flags" + ;; +esac + +if test x"$acx_pthread_ok" = xno; then +for flag in $acx_pthread_flags; do + + case $flag in + none) + AC_MSG_CHECKING([whether pthreads work without any flags]) + ;; + + -*) + AC_MSG_CHECKING([whether pthreads work with $flag]) + PTHREAD_CFLAGS="$flag" + ;; + + pthread-config) + AC_CHECK_PROG(acx_pthread_config, pthread-config, yes, no) + if test x"$acx_pthread_config" = xno; then continue; fi + PTHREAD_CFLAGS="`pthread-config --cflags`" + PTHREAD_LIBS="`pthread-config --ldflags` `pthread-config --libs`" + ;; + + *) + AC_MSG_CHECKING([for the pthreads library -l$flag]) + PTHREAD_LIBS="-l$flag" + ;; + esac + + save_LIBS="$LIBS" + save_CFLAGS="$CFLAGS" + LIBS="$PTHREAD_LIBS $LIBS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Check for various functions. We must include pthread.h, + # since some functions may be macros. (On the Sequent, we + # need a special flag -Kthread to make this header compile.) + # We check for pthread_join because it is in -lpthread on IRIX + # while pthread_create is in libc. We check for pthread_attr_init + # due to DEC craziness with -lpthreads. We check for + # pthread_cleanup_push because it is one of the few pthread + # functions on Solaris that doesn't have a non-functional libc stub. + # We try pthread_create on general principles. + AC_TRY_LINK([#include ], + [pthread_t th; pthread_join(th, 0); + pthread_attr_init(0); pthread_cleanup_push(0, 0); + pthread_create(0,0,0,0); pthread_cleanup_pop(0); ], + [acx_pthread_ok=yes]) + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + AC_MSG_RESULT($acx_pthread_ok) + if test "x$acx_pthread_ok" = xyes; then + break; + fi + + PTHREAD_LIBS="" + PTHREAD_CFLAGS="" +done +fi + +# Various other checks: +if test "x$acx_pthread_ok" = xyes; then + save_LIBS="$LIBS" + LIBS="$PTHREAD_LIBS $LIBS" + save_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS $PTHREAD_CFLAGS" + + # Detect AIX lossage: JOINABLE attribute is called UNDETACHED. + AC_MSG_CHECKING([for joinable pthread attribute]) + attr_name=unknown + for attr in PTHREAD_CREATE_JOINABLE PTHREAD_CREATE_UNDETACHED; do + AC_TRY_LINK([#include ], [int attr=$attr; return attr;], + [attr_name=$attr; break]) + done + AC_MSG_RESULT($attr_name) + if test "$attr_name" != PTHREAD_CREATE_JOINABLE; then + AC_DEFINE_UNQUOTED(PTHREAD_CREATE_JOINABLE, $attr_name, + [Define to necessary symbol if this constant + uses a non-standard name on your system.]) + fi + + AC_MSG_CHECKING([if more special flags are required for pthreads]) + flag=no + case "${host_cpu}-${host_os}" in + *-aix* | *-freebsd* | *-darwin*) flag="-D_THREAD_SAFE";; + *solaris* | *-osf* | *-hpux*) flag="-D_REENTRANT";; + esac + AC_MSG_RESULT(${flag}) + if test "x$flag" != xno; then + PTHREAD_CFLAGS="$flag $PTHREAD_CFLAGS" + fi + + LIBS="$save_LIBS" + CFLAGS="$save_CFLAGS" + + # More AIX lossage: must compile with xlc_r or cc_r + if test x"$GCC" != xyes; then + AC_CHECK_PROGS(PTHREAD_CC, xlc_r cc_r, ${CC}) + else + PTHREAD_CC=$CC + fi +else + PTHREAD_CC="$CC" +fi + +AC_SUBST(PTHREAD_LIBS) +AC_SUBST(PTHREAD_CFLAGS) +AC_SUBST(PTHREAD_CC) + +# Finally, execute ACTION-IF-FOUND/ACTION-IF-NOT-FOUND: +if test x"$acx_pthread_ok" = xyes; then + ifelse([$1],,AC_DEFINE(HAVE_PTHREAD,1,[Define if you have POSIX threads libraries and header files.]),[$1]) + : +else + acx_pthread_ok=no + $2 +fi +AC_LANG_RESTORE +])dnl ACX_PTHREAD diff --git a/test/Makefile.am b/test/Makefile.am index 9fa76708..0e254a80 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -17,9 +17,13 @@ test_weof_LDADD = ../libevent_core.la test_time_SOURCES = test-time.c test_time_LDADD = ../libevent_core.la regress_SOURCES = regress.c regress.h regress_http.c regress_dns.c \ - regress_rpc.c \ - regress.gen.c regress.gen.h -regress_LDADD = ../libevent.la + regress_rpc.c regress.gen.c regress.gen.h +if PTHREAD_REGRESS +regress_SOURCES += regress_pthread.c +endif +regress_LDADD = ../libevent.la $(PTHREAD_LIBS) +regress_CFLAGS = -I$(top_srcdir) -I$(top_srcdir)/compat \ + -I$(top_srcdir)/include $(PTHREAD_CFLAGS) bench_SOURCES = bench.c bench_LDADD = ../libevent.la bench_cascade_SOURCES = bench_cascade.c diff --git a/test/regress.c b/test/regress.c index 9f4f76b6..48d3a712 100644 --- a/test/regress.c +++ b/test/regress.c @@ -1727,12 +1727,16 @@ main (int argc, char **argv) test_event_base_new(); +#if defined(HAVE_PTHREADS) && !defined(DISABLE_THREAD_SUPPORT) + regress_pthread(); +#endif + http_suite(); rpc_suite(); dns_suite(); - + #ifndef WIN32 test_fork(); #endif diff --git a/test/regress.h b/test/regress.h index 4060ff5c..c5a4510a 100644 --- a/test/regress.h +++ b/test/regress.h @@ -37,6 +37,8 @@ void http_basic_test(void); void rpc_suite(void); void dns_suite(void); + +void regress_pthread(void); #ifdef __cplusplus } diff --git a/test/regress_pthread.c b/test/regress_pthread.c new file mode 100644 index 00000000..52d7e100 --- /dev/null +++ b/test/regress_pthread.c @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2008 Niels Provos + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +#include +#include + +#include "evutil.h" +#include "event.h" +#include "event2/thread.h" + +static pthread_mutex_t *all_locks; + +struct cond_wait { + pthread_mutex_t lock; + pthread_cond_t cond; +}; + +static void +basic_timeout(int fd, short what, void *arg) +{ + struct cond_wait *cw = arg; + assert(pthread_mutex_lock(&cw->lock) == 0); + assert(pthread_cond_broadcast(&cw->cond) == 0); + assert(pthread_mutex_unlock(&cw->lock) == 0); +} + +#define NUM_THREADS 100 +static pthread_mutex_t count_lock; +static int count; + +static void * +basic_thread(void *arg) +{ + struct cond_wait cw; + struct event_base *base = arg; + struct event ev; + int i = 0; + + assert(pthread_mutex_init(&cw.lock, NULL) == 0); + assert(pthread_cond_init(&cw.cond, NULL) == 0); + + evtimer_set(&ev, basic_timeout, &cw); + event_base_set(base, &ev); + for (i = 0; i < 100; i++) { + struct timeval tv; + evutil_timerclear(&tv); + assert(evtimer_add(&ev, &tv) == 0); + + assert(pthread_mutex_lock(&cw.lock) == 0); + assert(pthread_cond_wait(&cw.cond, &cw.lock) == 0); + assert(pthread_mutex_unlock(&cw.lock) == 0); + + assert(pthread_mutex_lock(&count_lock) == 0); + ++count; + assert(pthread_mutex_unlock(&count_lock) == 0); + } + + /* exit the loop only if all threads fired all timeouts */ + assert(pthread_mutex_lock(&count_lock) == 0); + if (count >= NUM_THREADS * 100) + event_base_loopexit(base, NULL); + assert(pthread_mutex_unlock(&count_lock) == 0); + + assert(pthread_cond_destroy(&cw.cond) == 0); + assert(pthread_mutex_destroy(&cw.lock) == 0); + + return (NULL); +} + +static void +pthread_basic(struct event_base *base) +{ + pthread_t threads[NUM_THREADS]; + struct event ev; + struct timeval tv; + int i; + + fprintf(stdout, "Testing basic pthreads support: "); + + for (i = 0; i < NUM_THREADS; ++i) + pthread_create(&threads[i], NULL, basic_thread, base); + + evtimer_set(&ev, NULL, NULL); + event_base_set(base, &ev); + evutil_timerclear(&tv); + tv.tv_sec = 1000; + event_add(&ev, &tv); + + event_base_dispatch(base); + + for (i = 0; i < NUM_THREADS; ++i) + pthread_join(threads[i], NULL); + + event_del(&ev); + + fprintf(stdout, "OK\n"); +} + +static void +locking(int mode, int locknum) +{ + pthread_mutex_t *lock = &all_locks[locknum]; + + if (mode & EVTHREAD_LOCK) + pthread_mutex_lock(lock); + else + pthread_mutex_unlock(lock); +} + +static unsigned long +get_id(void) +{ + return (unsigned long)(pthread_self()); +} + +void +regress_pthread() +{ + struct event_base *base = event_base_new(); + int i; + + pthread_mutex_init(&count_lock, NULL); + + all_locks = malloc(evthread_num_locks() * sizeof(pthread_mutex_t)); + for (i = 0; i < evthread_num_locks(); ++i) + pthread_mutex_init(&all_locks[i], NULL); + + evthread_set_locking_callback(base, locking); + evthread_set_id_callback(base, get_id); + + pthread_basic(base); + + for (i = 0; i < evthread_num_locks(); ++i) + pthread_mutex_destroy(&all_locks[i]); + + pthread_mutex_destroy(&count_lock); + + event_base_free(base); +}