From ec4cfa33c91b9df4dc1621315999cba60cc839e2 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 19 Jan 2009 01:34:14 +0000 Subject: [PATCH] Make event_break threadsafe; make notify-thread mechanism a little more generic; let it use pipes where they work. svn:r1019 --- configure.in | 2 +- event.c | 90 +++++++++++++++++++++++++++++++++-------- include/event2/thread.h | 6 +++ 3 files changed, 80 insertions(+), 18 deletions(-) diff --git a/configure.in b/configure.in index 3ebe498f..6518659f 100644 --- a/configure.in +++ b/configure.in @@ -146,7 +146,7 @@ AC_C_INLINE AC_HEADER_TIME dnl Checks for library functions. -AC_CHECK_FUNCS(gettimeofday vasprintf fcntl clock_gettime strtok_r strsep getaddrinfo getnameinfo strlcpy inet_ntop inet_pton signal sigaction strtoll inet_aton) +AC_CHECK_FUNCS(gettimeofday vasprintf fcntl clock_gettime strtok_r strsep getaddrinfo getnameinfo strlcpy inet_ntop inet_pton signal sigaction strtoll inet_aton pipe) AC_CHECK_SIZEOF(long) diff --git a/event.c b/event.c index f780cacc..124aa07a 100644 --- a/event.c +++ b/event.c @@ -144,6 +144,10 @@ static void timeout_correct(struct event_base *, struct timeval *); static void event_signal_closure(struct event_base *, struct event *ev); static void event_periodic_closure(struct event_base *, struct event *ev); +static int evthread_notify_base(struct event_base *base, ev_uint8_t msg); +#define EVTHREAD_NOTIFY_MSG_RECALC 0 +#define EVTHREAD_NOTIFY_MSG_LOOPBREAK 1 + static void detect_monotonic(void) { @@ -675,7 +679,6 @@ event_base_loopexit(struct event_base *event_base, const struct timeval *tv) event_base, tv)); } -/* not thread safe */ int event_loopbreak(void) { @@ -688,8 +691,12 @@ event_base_loopbreak(struct event_base *event_base) if (event_base == NULL) return (-1); - event_base->event_break = 1; - return (0); + if (!EVTHREAD_IN_THREAD(event_base)) { + return evthread_notify_base(event_base, EVTHREAD_NOTIFY_MSG_LOOPBREAK); + } else { + event_base->event_break = 1; + return (0); + } } @@ -1030,6 +1037,16 @@ event_add(struct event *ev, const struct timeval *tv) return (res); } +static int +evthread_notify_base(struct event_base *base, ev_uint8_t msg) +{ + char buf[1]; + int r; + buf[0] = (char)msg; + r = send(base->th_notify_fd[1], buf, 1, 0); + return (r < 0) ? -1 : 0; +} + static inline int event_add_internal(struct event *ev, const struct timeval *tv) { @@ -1110,7 +1127,7 @@ event_add_internal(struct event *ev, const struct timeval *tv) /* if we are not in the right thread, we need to wake up the loop */ if (res != -1 && !EVTHREAD_IN_THREAD(base)) - send(base->th_notify_fd[1], "", 1, 0); + evthread_notify_base(base, EVTHREAD_NOTIFY_MSG_RECALC); return (res); } @@ -1170,8 +1187,8 @@ event_del_internal(struct event *ev) /* if we are not in the right thread, we need to wake up the loop */ if (res != -1 && !EVTHREAD_IN_THREAD(base)) - send(base->th_notify_fd[1], "", 1, 0); - + evthread_notify_base(base, EVTHREAD_NOTIFY_MSG_RECALC); + return (res); } @@ -1181,7 +1198,7 @@ event_active(struct event *ev, int res, short ncalls) EVTHREAD_ACQUIRE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock); event_active_internal(ev, res, ncalls); - + EVTHREAD_RELEASE_LOCK(ev->ev_base, EVTHREAD_WRITE, th_base_lock); } @@ -1482,14 +1499,22 @@ evthread_set_locking_callback(struct event_base *base, } static void -evthread_ignore_fd(int fd, short what, void *arg) +evthread_notification_callback(int fd, short what, void *arg) { struct event_base *base = arg; - char buf[128]; - + unsigned char buf[128]; + int n, i; + /* we're draining the socket */ - while (recv(fd, buf, sizeof(buf), 0) != -1) - ; + while ((n = recv(fd, (char*)buf, sizeof(buf), 0)) != -1) { + for (i=0;ith_notify, NULL); } @@ -1508,25 +1533,56 @@ evthread_set_id_callback(struct event_base *base, #endif base->th_get_id = id_fn; base->th_owner_id = (*id_fn)(); + /* * If another thread wants to add a new event, we need to notify * the thread that owns the base to wakeup for rescheduling. */ - if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, - base->th_notify_fd) == -1) - event_sock_err(1, -1, "%s: socketpair", __func__); + evthread_make_base_notifiable(base); +} + +int +evthread_make_base_notifiable(struct event_base *base) +{ + if (!base) + return -1; + + if (base->th_notify_fd[0] >= 0) + return 0; + +#if defined(XXX_EVENT_HAVE_PIPE) + if ((base->evsel->features & EV_FEATURE_FDS)) { + if (pipe(base->th_notify_fd) < 0) + event_warn(1, "%s: pipe", __func__); + } + if (base->th_notify_fd[0] < 0) +#endif + { + if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, + base->th_notify_fd) == -1) { + event_sock_warn(-1, "%s: socketpair", __func__); + return (-1); + } + } evutil_make_socket_nonblocking(base->th_notify_fd[0]); - evutil_make_socket_nonblocking(base->th_notify_fd[1]); + + // This can't be right, can it? We want writes to this socket to + // just succeed. + // evutil_make_socket_nonblocking(base->th_notify_fd[1]); /* prepare an event that we can use for wakeup */ event_assign(&base->th_notify, base, base->th_notify_fd[0], EV_READ, - evthread_ignore_fd, base); + evthread_notification_callback, base); /* we need to mark this as internal event */ base->th_notify.ev_flags |= EVLIST_INTERNAL; + /* XXX th_notify should have a very high priority. */ + event_add(&base->th_notify, NULL); + + return 0; } void diff --git a/include/event2/thread.h b/include/event2/thread.h index 5de97bc4..2ef7f6bf 100644 --- a/include/event2/thread.h +++ b/include/event2/thread.h @@ -96,6 +96,12 @@ void evthread_set_locking_callback(struct event_base *base, void evthread_set_id_callback(struct event_base *base, unsigned long (*id_fn)(void)); +/** Make sure it's safe to tell an event base to wake up from another thread. + + @return 0 on success, -1 on failure. + */ +int evthread_make_base_notifiable(struct event_base *base); + #ifdef __cplusplus } #endif -- 2.40.0