struct timeval tv_cache;
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/* threading support */
unsigned long th_owner_id;
- unsigned long (*th_get_id)(void);
void *th_base_lock;
-
- /* allocate/free locks */
- void *(*th_alloc)(void);
- void (*th_free)(void *lock);
-
- /* lock or unlock a lock */
- void (*th_lock)(int mode, void *lock);
+#endif
/* Notify main thread to wake up break, etc. */
int th_notify_fd[2];
TAILQ_HEAD(event_configq, event_config_entry) entries;
enum event_method_feature require_features;
+ enum event_base_config_flag flags;
};
/* Internal use only: Functions that might be missing from <sys/queue.h> */
#endif
#endif /* _EVENT_INTERNAL_H_ */
+
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
+ if (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK)) {
+ int r;
+ EVTHREAD_ALLOC_LOCK(base->th_base_lock);
+ r = evthread_make_base_notifiable(base);
+ if (r<0) {
+ event_base_free(base);
+ return NULL;
+ }
+ }
+
return (base);
}
base->th_notify_fd[1] = -1;
}
- if (base->th_base_lock != NULL)
- (*base->th_free)(base->th_base_lock);
+ EVTHREAD_FREE_LOCK(base->th_base_lock);
/* Delete all non-internal events. */
for (ev = TAILQ_FIRST(&base->eventqueue); ev; ) {
struct event_list *activeq = NULL;
int i;
- EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
ev->ev_res & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_callback));
- EVTHREAD_RELEASE_LOCK(base,
+ EVBASE_RELEASE_LOCK(base,
EVTHREAD_WRITE, th_base_lock);
if (ev->ev_closure != NULL)
(int)ev->ev_fd, ev->ev_res, ev->ev_arg);
if (event_gotsig || base->event_break)
return;
- EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
- EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
/*
if (event_base == NULL)
return (-1);
- EVTHREAD_ACQUIRE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
event_base->event_break = 1;
- EVTHREAD_RELEASE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(event_base, EVTHREAD_WRITE, th_base_lock);
- if (!EVTHREAD_IN_THREAD(event_base)) {
+ if (!EVBASE_IN_THREAD(event_base)) {
return evthread_notify_base(event_base);
} else {
return (0);
if (base->sig.ev_signal_added)
evsig_base = base;
done = 0;
+
+#ifndef _EVENT_DISABLE_THREAD_SUPPORT
+ base->th_owner_id = EVTHREAD_GET_ID();
+#endif
+
while (!done) {
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
{
int res;
- EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
res = event_add_internal(ev, tv);
- EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
}
/* if we are not in the right thread, we need to wake up the loop */
- if (res != -1 && !EVTHREAD_IN_THREAD(base))
+ if (res != -1 && !EVBASE_IN_THREAD(base))
evthread_notify_base(base);
return (res);
{
int res;
- EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
res = event_del_internal(ev);
- EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
}
/* if we are not in the right thread, we need to wake up the loop */
- if (res != -1 && !EVTHREAD_IN_THREAD(base))
+ if (res != -1 && !EVBASE_IN_THREAD(base))
evthread_notify_base(base);
return (res);
void
event_active(struct event *ev, int res, short ncalls)
{
- EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
event_active_internal(ev, res, ncalls);
- EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock);
}
struct timeval *tv = *tv_p;
int res = 0;
- EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
ev = min_heap_top(&base->timeheap);
if (ev == NULL) {
event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
out:
- EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return (res);
}
/* Check if time is running backwards */
gettime(base, tv);
- EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (evutil_timercmp(tv, &base->event_tv, >=)) {
base->event_tv = *tv;
- EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
struct timeval *ev_tv = &(**pev).ev_timeout;
evutil_timersub(ev_tv, &off, ev_tv);
}
- EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
void
struct timeval now;
struct event *ev;
- EVTHREAD_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_ACQUIRE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
if (min_heap_empty(&base->timeheap)) {
- EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
return;
}
ev->ev_callback));
event_active_internal(ev, EV_TIMEOUT, 1);
}
- EVTHREAD_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
+ EVBASE_RELEASE_LOCK(base, EVTHREAD_WRITE, th_base_lock);
}
void
_mm_free_fn = free_fn;
}
+
/* support for threading */
+void (*_evthread_locking_fn)(int mode, void *lock) = NULL;
+unsigned long (*_evthread_id_fn)(void) = NULL;
+void *(*_evthread_lock_alloc_fn)(void) = NULL;
+void (*_evthread_lock_free_fn)(void *) = NULL;
+
void
-evthread_set_locking_callback(struct event_base *base,
- void (*locking_fn)(int mode, void *lock))
+evthread_set_locking_callback(void (*locking_fn)(int mode, void *lock))
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
event_errx(1, "%s: not compiled with thread support", __func__);
+#else
+ _evthread_locking_fn = locking_fn;
#endif
- base->th_lock = locking_fn;
}
#if defined(_EVENT_HAVE_EVENTFD) && defined(_EVENT_HAVE_SYS_EVENTFD_H)
}
void
-evthread_set_id_callback(struct event_base *base,
- unsigned long (*id_fn)(void))
+evthread_set_id_callback(unsigned long (*id_fn)(void))
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
event_errx(1, "%s: not compiled with thread support", __func__);
-#endif
-#ifdef WIN32
-#define LOCAL_SOCKETPAIR_AF AF_INET
#else
-#define LOCAL_SOCKETPAIR_AF AF_UNIX
+ _evthread_id_fn = id_fn;
#endif
- base->th_get_id = id_fn;
- base->th_owner_id = (*id_fn)();
-
- /*
- * If another thread wants to add a new event, we need to notify
- * the thread that owns the base to wakeup for rescheduling.
- */
- evthread_make_base_notifiable(base);
}
int
}
}
if (base->th_notify_fd[0] < 0)
+#endif
+
+#ifdef WIN32
+#define LOCAL_SOCKETPAIR_AF AF_INET
+#else
+#define LOCAL_SOCKETPAIR_AF AF_UNIX
#endif
{
if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
- base->th_notify_fd) == -1) {
+ base->th_notify_fd) == -1) {
event_sock_warn(-1, "%s: socketpair", __func__);
return (-1);
}
}
void
-evthread_set_lock_create_callbacks(struct event_base *base,
- void *(*alloc_fn)(void), void (*free_fn)(void *))
+evthread_set_lock_create_callbacks(void *(*alloc_fn)(void),
+ void (*free_fn)(void *))
{
#ifdef _EVENT_DISABLE_THREAD_SUPPORT
event_errx(1, "%s: not compiled with thread support", __func__);
+#else
+ _evthread_lock_alloc_fn = alloc_fn;
+ _evthread_lock_free_fn = free_fn;
#endif
- base->th_alloc = alloc_fn;
- base->th_free = free_fn;
-
- /* now, let's allocate our lock */
- base->th_base_lock = (*alloc_fn)();
}
void
#define _EVTHREAD_INTERNAL_H_
#ifdef __cplusplus
-extern "C" {
+//extern "C" {
#endif
#include "event-config.h"
struct event_base;
+
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
-#define EVTHREAD_USE_LOCKS(base) \
- (base != NULL && (base)->th_lock != NULL)
+extern void (*_evthread_locking_fn)(int mode, void *lock);
+extern unsigned long (*_evthread_id_fn)(void);
+extern void *(*_evthread_lock_alloc_fn)(void);
+extern void (*_evthread_lock_free_fn)(void *);
+
+#define EVBASE_USING_LOCKS(base) \
+ (base != NULL && (base)->th_base_lock != NULL)
+
+#define EVTHREAD_GET_ID() \
+ (_evthread_id_fn ? _evthread_id_fn() : 1)
-#define EVTHREAD_IN_THREAD(base) \
- ((base)->th_get_id == NULL || \
- (base)->th_owner_id == (*(base)->th_get_id)())
+#define EVBASE_IN_THREAD(base) \
+ (_evthread_id_fn == NULL || \
+ (base)->th_owner_id == _evthread_id_fn())
-#define EVTHREAD_GET_ID(base) \
- (*(base)->th_get_id)()
+#define EVTHREAD_ALLOC_LOCK(lockvar) \
+ ((lockvar) = _evthread_lock_alloc_fn ? \
+ _evthread_lock_alloc_fn() : NULL)
-#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock) do { \
- if (EVTHREAD_USE_LOCKS(base)) \
- (*(base)->th_lock)(EVTHREAD_LOCK | mode, \
- (base)->lock); \
+#define EVTHREAD_FREE_LOCK(lockvar) \
+ do { \
+ if (lockvar && _evthread_lock_free_fn) \
+ _evthread_lock_free_fn(lockvar); \
+ } while (0);
+
+#define EVBASE_ACQUIRE_LOCK(base, mode, lock) do { \
+ if (EVBASE_USING_LOCKS(base)) \
+ _evthread_locking_fn(EVTHREAD_LOCK | mode, \
+ (base)->lock); \
} while (0)
-#define EVTHREAD_RELEASE_LOCK(base, mode, lock) do { \
- if (EVTHREAD_USE_LOCKS(base)) \
- (*(base)->th_lock)(EVTHREAD_UNLOCK | mode, \
- (base)->lock); \
+#define EVBASE_RELEASE_LOCK(base, mode, lock) do { \
+ if (EVBASE_USING_LOCKS(base)) \
+ _evthread_locking_fn(EVTHREAD_UNLOCK | mode, \
+ (base)->lock); \
} while (0)
#else /* _EVENT_DISABLE_THREAD_SUPPORT */
-#define EVTHREAD_USE_LOCKS(base)
-#define EVTHREAD_IN_THREAD(base) 1
-#define EVTHREAD_GET_ID(base)
-#define EVTHREAD_ACQUIRE_LOCK(base, mode, lock)
-#define EVTHREAD_RELEASE_LOCK(base, mode, lock)
+#define EVTHREAD_GET_ID() 1
+#define EVTHREAD_ALLOC_LOCK(lockvar)
+#define EVTHREAD_FREE_LOCK(lockvar)
+
+#define EVBASE_IN_THREAD() 1
+#define EVBASE_ACQUIRE_LOCK(base, mode, lock)
+#define EVBASE_RELEASE_LOCK(base, mode, lock)
#endif
#ifdef __cplusplus
#include "event-config.h"
#endif
+/* With glibc we need to define this to get PTHREAD_MUTEX_RECURSIVE. */
+#define _GNU_SOURCE
#include <pthread.h>
+
struct event_base;
#include <event2/thread.h>
#include "mm-internal.h"
+static pthread_mutexattr_t attr_recursive;
+
static void *
evthread_posix_lock_create(void)
{
pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t));
if (!lock)
return NULL;
- pthread_mutex_init(lock, NULL);
+ if (pthread_mutex_init(lock, &attr_recursive)) {
+ mm_free(lock);
+ return NULL;
+ }
return lock;
}
}
int
-evthread_use_pthreads(struct event_base *base)
+evthread_use_pthreads(void)
{
- evthread_set_lock_create_callbacks(base,
- evthread_posix_lock_create,
- evthread_posix_lock_free);
- evthread_set_locking_callback(base, evthread_posix_lock);
- evthread_set_id_callback(base, evthread_posix_get_id);
+ /* Set ourselves up to get recursive locks. */
+ pthread_mutexattr_init(&attr_recursive);
+ pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE);
+
+ evthread_set_lock_create_callbacks(
+ evthread_posix_lock_create,
+ evthread_posix_lock_free);
+ evthread_set_locking_callback(evthread_posix_lock);
+ evthread_set_id_callback(evthread_posix_get_id);
return -1;
}
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#undef WIN32_LEAN_AND_MEAN
+#include <sys/locking.h>
#endif
struct event_base;
}
int
-evthread_use_windows_threads(struct event_base *base)
+evthread_use_windows_threads(void)
{
- evthread_set_lock_create_callbacks(base,
- evthread_win32_lock_create,
- evthread_win32_lock_free);
- evthread_set_locking_callback(base, evthread_win32_lock);
- evthread_set_id_callback(base, evthread_win32_get_id);
+ evthread_set_lock_create_callbacks(
+ evthread_win32_lock_create,
+ evthread_win32_lock_free);
+ evthread_set_locking_callback(evthread_win32_lock);
+ evthread_set_id_callback(evthread_win32_get_id);
return 0;
}
EV_FEATURE_FDS = 0x04,
};
+enum event_base_config_flag {
+ EVENT_BASE_FLAG_NOLOCK = 0x01,
+};
+
/**
Return a bitmask of the features implemented by an event base.
*/
int event_config_require_features(struct event_config *cfg,
enum event_method_feature feature);
+/** Sets a flag to configure what parts of the eventual event_base will
+ * be initialized, and how they'll work. */
+int event_config_set_flag(struct event_config *cfg,
+ enum event_base_config_flag flag);
+
/**
Initialize the event API.
add and delete events from a single event base, libevent needs to
lock its data structures.
+ Like the memory-management function hooks, all of the threading functions
+ _must_ be set up before an event_base is created if you want the base to
+ use them.
+
A multi-threaded application must provide locking functions to
libevent via evthread_set_locking_callback(). Libevent will invoke
this callback whenever a lock needs to be acquired or released.
evthread_set_locking_callback() before using libevent in a
multi-threaded application.
+ Locks must be recursive. That is, it must be safe for a thread to
+ acquire a lock that it already holds.
+
@param alloc_fn function to be called when allocating a new lock
@param free_fn function to be called to a free a lock
*/
-void evthread_set_lock_create_callbacks(struct event_base *base,
+void evthread_set_lock_create_callbacks(
void *(*alloc_fn)(void), void (*free_fn)(void *));
-struct event_base;
/**
Sets the function libevent should use for locking.
- @param base the event base for which the locking function should be set
@param locking_fn the function that libevent should invoke to acquire
or release a lock. mode has either EVTHREAD_LOCK or EVTHREAD_UNLOCK
set, and in addition, either EVHTREAD_WRITE or EVTREAD_READ.
*/
-void evthread_set_locking_callback(struct event_base *base,
+void evthread_set_locking_callback(
void (*locking_fn)(int mode, void *lock));
/**
@param id_fn the identify function libevent should invoke to
determine the identity of a thread.
*/
-void evthread_set_id_callback(struct event_base *base,
+void evthread_set_id_callback(
unsigned long (*id_fn)(void));
/** Make sure it's safe to tell an event base to wake up from another thread.
#ifdef WIN32
/** Sets up libevent for use with Windows builtin locking and thread ID
- functions. Unavailable if libevent is not build for Windows.
+ functions. Unavailable if libevent is not built for Windows.
@return 0 on success, -1 on failure. */
-int evthread_use_windows_threads(struct event_base *base);
+int evthread_use_windows_threads(void);
#endif
#ifdef _EVENT_HAVE_PTHREADS
libraries to link against libevent_pthreads as well as libevent.
@return 0 on success, -1 on failure. */
-int evthread_use_pthreads(struct event_base *base);
+int evthread_use_pthreads(void);
#endif
#ifdef __cplusplus
$(regress_pthread_SOURCES) $(regress_zlib_SOURCES)
if PTHREADS
regress_pthread_SOURCES = regress_pthread.c
+PTHREAD_LIBS += ../libevent_pthreads.la
endif
if ZLIB_REGRESS
regress_zlib_SOURCES = regress_zlib.c
fprintf(stdout, "OK\n");
}
-static void
-locking(int mode, void *lock)
-{
- if (mode & EVTHREAD_LOCK)
- pthread_mutex_lock(lock);
- else
- pthread_mutex_unlock(lock);
-}
-
-static void *
-alloc_lock(void)
-{
- pthread_mutex_t *lock = malloc(sizeof(*lock));
- assert(lock != NULL);
-
- pthread_mutex_init(lock, NULL);
-
- return (lock);
-}
-
-static void
-free_lock(void *lock)
-{
- pthread_mutex_destroy(lock);
- free(lock);
-}
-
-static unsigned long
-get_id(void)
-{
- union {
- pthread_t thr;
- unsigned long id;
- } r;
- r.id = 0;
- r.thr = pthread_self();
- return r.id;
-}
-
void
regress_pthread(void)
{
- struct event_base *base = event_base_new();
+ struct event_base *base;
pthread_mutex_init(&count_lock, NULL);
- evthread_set_lock_create_callbacks(base, alloc_lock, free_lock);
- evthread_set_locking_callback(base, locking);
- evthread_set_id_callback(base, get_id);
+ evthread_use_pthreads();
+
+ base = event_base_new();
+ if (evthread_make_base_notifiable(base)<0) {
+ puts("Couldn't make base notifiable!");
+ return;
+ }
pthread_basic(base);