#include "evmap-internal.h"
#include "event2/thread.h"
#include "evthread-internal.h"
+#include "changelist-internal.h"
#define NEVENT 64
struct kqop {
struct kevent *changes;
- int nchanges;
int changes_size;
struct kevent *pend_changes;
- int n_pend_changes;
int pend_changes_size;
struct kevent *events;
static void kqop_free(struct kqop *kqop);
static void *kq_init (struct event_base *);
-static int kq_add (struct event_base *, int, short, short, void *);
-static int kq_del (struct event_base *, int, short, short, void *);
static int kq_sig_add (struct event_base *, int, short, short, void *);
static int kq_sig_del (struct event_base *, int, short, short, void *);
static int kq_dispatch (struct event_base *, struct timeval *);
-static int kq_insert (struct kqop *, struct kevent *);
static void kq_dealloc (struct event_base *);
const struct eventop kqops = {
"kqueue",
kq_init,
- kq_add,
- kq_del,
+ event_changelist_add,
+ event_changelist_del,
kq_dispatch,
kq_dealloc,
1 /* need reinit */,
EV_FEATURE_ET|EV_FEATURE_O1|EV_FEATURE_FDS,
- 0
+ EVENT_CHANGELIST_FDINFO_SIZE
};
static const struct eventop kqsigops = {
if (!(kqueueop = mm_calloc(1, sizeof(struct kqop))))
return (NULL);
- /* Initialize the kernel queue */
+/* Initialize the kernel queue */
if ((kq = kqueue()) == -1) {
event_warn("kqueue");
return (NULL);
}
-static int
-kq_insert(struct kqop *kqop, struct kevent *kev)
+static void
+kq_sighandler(int sig)
{
- int size = kqop->changes_size;
-
- if (kqop->nchanges == size) {
- struct kevent *newchange;
-
- size *= 2;
-
- newchange = mm_realloc(kqop->changes,
- size * sizeof(struct kevent));
- if (newchange == NULL) {
- event_warn("%s: malloc", __func__);
- return (-1);
- }
- kqop->changes = newchange;
- kqop->changes_size = size;
- }
-
- memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));
-
- event_debug(("%s: fd %d %s%s",
- __func__, (int)kev->ident,
- kev->filter == EVFILT_READ ? "EVFILT_READ" : "EVFILT_WRITE",
- kev->flags == EV_DELETE ? " (del)" : ""));
-
- return (0);
+ /* Do nothing here */
}
static void
-kq_sighandler(int sig)
+kq_setup_kevent(struct kevent *out, evutil_socket_t fd, int filter, short change)
{
- /* Do nothing here */
+ memset(out, 0, sizeof(out));
+ out->ident = fd;
+ out->filter = filter;
+
+ if (change & EV_CHANGE_ADD) {
+ out->flags = EV_ADD;
+ if (change & EV_ET)
+ out->flags |= EV_CLEAR;
+#ifdef NOTE_EOF
+ /* Make it behave like select() and poll() */
+ if (filter == EVFILT_READ)
+ out->fflags = NOTE_EOF;
+#endif
+ } else {
+ EVUTIL_ASSERT(change & EV_CHANGE_DEL);
+ out->flags = EV_DELETE;
+ }
}
#define SWAP(tp,a,b) \
b = tmp_swap_var; \
} while (0);
+
+static int
+kq_build_changes_list(const struct event_changelist *changelist,
+ struct kqop *kqop)
+{
+ int i;
+ int n_changes = 0;
+
+ for (i = 0; i < changelist->n_changes; ++i) {
+ struct event_change *in_ch = &changelist->changes[i];
+ struct kevent *out_ch;
+ if (n_changes >= kqop->changes_size - 1) {
+ int newsize = kqop->changes_size * 2;
+ struct kevent *newchanges;
+
+ newchanges = mm_realloc(kqop->changes,
+ newsize * sizeof(struct kevent));
+ if (newchanges == NULL) {
+ event_warn("%s: realloc", __func__);
+ return (-1);
+ }
+ kqop->changes = newchanges;
+ kqop->changes_size = newsize;
+ }
+ if (in_ch->read_change) {
+ out_ch = &kqop->changes[n_changes++];
+ kq_setup_kevent(out_ch, in_ch->fd, EVFILT_READ,
+ in_ch->read_change);
+ }
+ if (in_ch->write_change) {
+ out_ch = &kqop->changes[n_changes++];
+ kq_setup_kevent(out_ch, in_ch->fd, EVFILT_WRITE,
+ in_ch->write_change);
+ }
+ }
+ return n_changes;
+}
+
static int
kq_dispatch(struct event_base *base, struct timeval *tv)
{
struct kqop *kqop = base->evbase;
struct kevent *events = kqop->events;
struct timespec ts, *ts_p = NULL;
- int i, res;
+ int i, n_changes, res;
if (tv != NULL) {
TIMEVAL_TO_TIMESPEC(tv, &ts);
ts_p = &ts;
}
+ /* Build "changes" from "base->changes" */
+ n_changes = kq_build_changes_list(&base->changelist, kqop);
+ if (n_changes < 0)
+ return -1;
+
+ event_changelist_remove_all(&base->changelist, base);
+
/* We can't hold the lock while we're calling kqueue, so another
* thread might potentially mess with changes before the kernel has a
* chance to read it. Therefore, we need to keep the change list
* we're looking at in pend_changes, and let other threads mess with
* changes. */
SWAP(struct kevent *, kqop->changes, kqop->pend_changes);
- SWAP(int, kqop->nchanges, kqop->n_pend_changes);
SWAP(int, kqop->changes_size, kqop->pend_changes_size);
EVBASE_RELEASE_LOCK(base, th_base_lock);
- res = kevent(kqop->kq, kqop->pend_changes, kqop->n_pend_changes,
+ res = kevent(kqop->kq, kqop->pend_changes, n_changes,
events, kqop->events_size, ts_p);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
- kqop->n_pend_changes = 0;
if (res == -1) {
if (errno != EINTR) {
event_warn("kevent");
return (0);
}
-
-static int
-kq_add(struct event_base *base, int fd, short old, short events, void *p)
-{
- struct kqop *kqop = base->evbase;
- struct kevent kev;
- (void) p;
-
- if (events & EV_READ) {
- memset(&kev, 0, sizeof(kev));
- kev.ident = fd;
- kev.filter = EVFILT_READ;
-#ifdef NOTE_EOF
- /* Make it behave like select() and poll() */
- kev.fflags = NOTE_EOF;
-#endif
- kev.flags = EV_ADD;
- if (events & EV_ET)
- kev.flags |= EV_CLEAR;
-
- if (kq_insert(kqop, &kev) == -1)
- return (-1);
- }
-
- if (events & EV_WRITE) {
- memset(&kev, 0, sizeof(kev));
- kev.ident = fd;
- kev.filter = EVFILT_WRITE;
- kev.flags = EV_ADD;
- if (events & EV_ET)
- kev.flags |= EV_CLEAR;
-
- if (kq_insert(kqop, &kev) == -1)
- return (-1);
- }
-
- return (0);
-}
-
-static int
-kq_del(struct event_base *base, int fd, short old, short events, void *p)
-{
- struct kqop *kqop = base->evbase;
- struct kevent kev;
- (void) p;
-
- if (events & EV_READ) {
- memset(&kev, 0, sizeof(kev));
- kev.ident = fd;
- kev.filter = EVFILT_READ;
- kev.flags = EV_DELETE;
-
- if (kq_insert(kqop, &kev) == -1)
- return (-1);
- }
-
- if (events & EV_WRITE) {
- memset(&kev, 0, sizeof(kev));
- kev.ident = fd;
- kev.filter = EVFILT_WRITE;
- kev.flags = EV_DELETE;
-
- if (kq_insert(kqop, &kev) == -1)
- return (-1);
- }
-
- return (0);
-}
-
static void
kqop_free(struct kqop *kqop)
{