]> granicus.if.org Git - libevent/commitdiff
improved/well-completely rewritten rtsig support by Mathew Mills; fix some
authorNiels Provos <provos@gmail.com>
Sun, 26 Feb 2006 20:18:35 +0000 (20:18 +0000)
committerNiels Provos <provos@gmail.com>
Sun, 26 Feb 2006 20:18:35 +0000 (20:18 +0000)
cases where regress would not pass on Linux

svn:r204

rtsig.c
test/regress.c

diff --git a/rtsig.c b/rtsig.c
index 32a782cdee9845b696bdf2e1b110f6df40fec243..29aade6948dcdc23c2927e9c3dc7aa6453ff74b1 100644 (file)
--- a/rtsig.c
+++ b/rtsig.c
@@ -1,3 +1,152 @@
+/*
+ * Copyright (c) 2006 Mathew Mills <mathewmills@mac.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+/*
+ * Meta-level comments: You know that a kernel interface is wrong if
+ * supporting it requires three times more code than any of the other
+ * kernel interfaces supported in libevent.  Niels - 2006-02-22
+ */
+/**
+
+   "RTSIG" is a shorthand for using O_ASYNC to make descriptors send
+   signals when readable/writable and to use POSIX real-time signals
+   witch are queued unlike normal signals.  At first blush this may
+   seem like a alternative to epoll, but a number of problems arise
+   when attempting to build an eventloop entirely out of rtsig.
+   Still, we can use rtsig in combination with poll() to
+   provide an eventloop that allows for many thousands of sockets
+   without huge overheads implicit with using select() or poll()
+   alone.  epoll and kqueue are far superior to rtsig and should be
+   used where available, but rtsig has been in standard Linux kernels
+   for a long time and have a huge installation base.  epoll requires
+   special patches for 2.4 kernels and 2.6 kernels are not yet nearly
+   so ubiquitous.
+
+   rtsig problems:
+    - O_ASYNC mechanisms work only on sockets - not pipes or tty's
+
+    - O_ASYNC signals are edge-triggered, POLLIN on packet arriving
+   or socket close; POLLOUT when a socket transitions from
+   non-writable to writable.  Being edge-triggered means the
+   event-handler callbacks must transition the level ( reading
+   completely the socket buffer contents ) or it will be unable to
+   reliably receive notification again.
+
+   - rtsig implementations must be intimately involved in how a
+   process dispatches signals.
+
+   - delivering signals per-event can be expensive, CPU-wise, but
+     sigtimedwait() blocks on signals only and means non-sockets
+     cannot be serviced.
+
+   Theory of operation:
+    This libevent module uses rtsig to allow us to manage a set of
+    poll-event descriptors.  We can drop uninteresting fd's from the
+    pollset if the fd will send a signal when it becomes interesting
+    again.
+
+    poll() offers us level-triggering and, when we have verified the
+    level of a socket, we can trust the edge-trigger nature of the
+    ASYNC signal.
+
+    As an eventloop we must poll for external events but leverage
+    kernel functionality to sleep between events ( until the loop's
+    next scheduled timed event ).
+
+    If we are polling on any non-sockets then we simply have no choice
+    about blocking on the poll() call.  If we blocked on the
+    sigtimedwait() call as rtsig papers recommend we will not wake on
+    non-socket state transitions.  As part of libevent, this module
+    must support non-socket polling.
+
+    Many applications, however, do not need to poll on non-sockets and
+    so this module should be able to optimize this case by using
+    sigtimedwait().  For this reason this module can actually trigger
+    events in each of three different ways:
+      - poll() returning ready events from descriptors in the pollset
+
+      - real-time signals dequeued via sigtimedwait()
+
+      - real-time signals that call an installed signal handler which in
+    turn writes the contents of siginfo to one end of a socketpair
+    DGRAM socket.  The other end of the socket is always in the
+    pollset so poll will be guaranteed to return even if the signal is
+    received before entering poll().
+
+    non-socket descriptors force us to block on the poll() for the
+    duration of a dispatch.  In this case we unblock (w/ sigprocmask)
+    the managed signals just before polling.  Each managed signal is
+    handled by signal_handler() which send()'s the contents of siginfo
+    over the socketpair.  Otherwise, we call poll() with a timeout of
+    0ms so it checks the levels of the fd's in the pollset and returns
+    immediately.  Any fd that is a socket and has no active state is
+    removed from the pollset for the next pass -- we will rely on
+    getting a signal for events on these fd's.
+
+    The receiving end of the siginfo socketpair is in the pollset
+    (permanently) so if we are polling on non-sockets, the delivery of
+    signals immediately following sigprocmask( SIG_UNBLOCK...) will
+    result in a readable op->signal_recv_fd which ensures the poll()
+    will return immediately.  If the poll() call is blocking and a
+    signal arrives ( possibly a real-time signal from a socket not in
+    the pollset ) its handler will write the data to the socketpair
+    and interrupt the poll().
+
+    After every poll call we attempt a non-blocking recv from the
+    signal_recv_fd and continue to recv and dispatch the events until
+    recv indicates the socket buffer is empty.
+
+    One might raise concerns about receiving event activations from
+    both poll() and from the rtsig data in the signal_recv_fd.
+    Fortunately, libevent is already structured for event coalescing,
+    so this issue is mitigated ( though we do some work twice for the
+    same event making us less efficient ).  I suspect that the cost of
+    turning off the O_ASYNC flag on fd's in the pollset is more
+    expensive than handling some events twice.  Looking at the
+    kernel's source code for setting O_ASYNC, it looks like it takes a
+    global kernel lock...
+
+    After a poll and recv-loop for the signal_recv_fd, we finally do a
+    sigtimedwait().  sigtimedwait will only block if we haven't
+    blocked in poll() and we have not enqueued events from either the
+    poll or the recv-loop.  Because sigtimedwait blocks all signals
+    that are not in the set of signals to be dequeued, we need to
+    dequeue almost all signals and make sure we dispatch them
+    correctly.  We dequeue any signal that is not blocked as well as
+    all libevent-managed signals.  If we get a signal that is not
+    managed by libevent we lookup the sigaction for the specific
+    signal and call that function ourselves.
+
+    Finally, I should mention that getting a SIGIO signal indicates
+    that the rtsig buffer has overflowed and we have lost events.
+    This forces us to add _every_ descriptor to the pollset to recover.
+
+*/
+
+
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 #include <string.h>
 #include <sys/poll.h>
 #include <sys/queue.h>
-#ifndef HAVE_WORKING_RTSIG
-#include <sys/stat.h>
-#endif
+#include <sys/tree.h>
 #include <unistd.h>
-
-#define EVLIST_X_NORT  0x1000  /* Skip RT signals (internal) */
+#include <sys/socket.h>
 
 #include "event.h"
+#include "event-internal.h"
 #include "log.h"
 extern struct event_list signalqueue;
 
-struct rtsigop {
-    sigset_t sigs;
-    struct pollfd *poll;
-    struct event **toev;
-    int cur, max, total;
-#ifndef HAVE_WORKING_RTSIG
-    int pollmode;
+#include <linux/unistd.h>
+#ifndef __NR_gettid
+#define gettid() getpid()
+#else
+
+#if ((__GLIBC__ > 2) || ((__GLIBC__ == 2) && (__GLIBC_MINOR__ >= 3)))
+_syscall0(pid_t,gettid)
 #endif
+
+#endif
+
+#define EVLIST_NONSOCK   0x1000 /* event is for a non-socket file-descriptor */
+#define EVLIST_DONTDEL   0x2000 /* event should always be in the pollset */
+#define MAXBUFFERSIZE (1024 * 1024 * 2) /* max socketbuffer for signal-spair */
+#define INIT_MAX 16     /* init/min # of fd positions in our pollset */
+
+static int signal_send_fd[_NSIG]; /* the globalend of the signal socketpair */
+static int trouble[_NSIG]; /* 1 when signal-handler cant send to signal_send_fd */
+
+struct rtdata;
+TAILQ_HEAD(rtdata_list, rtdata);
+
+struct rtsigop {
+       sigset_t sigs;        /* signal mask for all _managed_ signals */
+       struct pollfd *poll;  /* poll structures */
+       struct rtdata **ptodat;  /* map poll_position to rtdata */
+       int cur;              /* cur # fd's in a poll set */
+       int max;              /* max # fd's in a poll set, start at 16 and grow as needed */
+       int total;            /* count of fd's we are watching now */
+       int signo;            /* the signo we use for ASYNC fd notifications */
+       int nonsock;          /* number of non-socket fd's we are watching */
+       int highestfd;        /* highest fd accomodated by fdtodat */
+       struct rtdata_list **fdtodat; /* map fd to rtdata ( and thus to event ) */
+       int signal_recv_fd;   /* recv side of the signal_send_fd */
+       int signal_send_fd;   /* recv side of the signal_send_fd */
+       struct event sigfdev; /* our own event structure for the signal fd */
+};
+
+struct rtdata {
+       /* rtdata holds rtsig-private state on each event */
+       TAILQ_ENTRY (rtdata) next;
+       struct event *ev;
+       int poll_position;
+};
+
+void *rtsig_init(void);
+int rtsig_add(void *, struct event *);
+int rtsig_del(void *, struct event *);
+int rtsig_recalc(struct event_base *, void *, int);
+int rtsig_dispatch(struct event_base *, void *, struct timeval *);
+
+struct eventop rtsigops = {
+       "rtsig",
+       rtsig_init,
+       rtsig_add,
+       rtsig_del,
+       rtsig_recalc,
+       rtsig_dispatch
+};
+
+static void
+signal_handler(int sig, siginfo_t *info, void *ctx)
+{
+       /*
+        * the signal handler for all libevent-managed signals only
+        * used if we need to do a blocking poll() call due to
+        * non-socket fd's in the pollset.
+        */
+  
+       siginfo_t *i = info;
+       siginfo_t i_local;
+
+       if (trouble[sig - 1]) {
+               i_local.si_signo = SIGIO;
+               i_local.si_errno = 0;
+               i_local.si_code = 0;
+               i = &i_local;
+               trouble[sig - 1] = 0;
+       }
+
+       if (send(signal_send_fd[sig - 1], i, sizeof(*i),
+               MSG_DONTWAIT|MSG_NOSIGNAL) == -1)
+               trouble[sig - 1] = 1;
+}
+
+static void
+donothing(int fd, short event, void *arg)
+{
+       /*
+        * callback for our signal_recv_fd event structure
+        * we don't want to act on these events, we just want to wake the poll()
+        */
 };
 
-#define INIT_MAX 16
+static void
+signotset(sigset_t *set)
+{
+       int i, l;
+       l = sizeof(*set) / 4;
+       for (i = 0; i < l; i++) {
+               ((unsigned *)set)[i] = ~((unsigned *)set)[i];
+       }
+}
+
+/*  The next three functions manage our private data about each event struct */
 
 static int
-poll_add(struct rtsigop *op, struct event *ev)
+grow_fdset(struct rtsigop *op, int newhigh)
 {
-    struct pollfd *pfd;
-
-    if (op->poll == NULL) return 0;
-
-    if (op->cur == op->max) {
-        void *p;
-
-        p = realloc(op->poll, sizeof(*op->poll) * (op->max << 1));
-        if (!p) {
-            errno = ENOMEM;
-            return -1;
-        }
-        op->poll = p;
-        p = realloc(op->toev, sizeof(*op->toev) * (op->max << 1));
-        if (!p) {
-            op->poll = realloc(op->poll, sizeof(*op->poll) * op->max);
-            errno = ENOMEM;
-            return -1;
-        }
-        op->toev = p;
-        op->max <<= 1;
-    }
-
-    pfd = &op->poll[op->cur];
-    pfd->fd = ev->ev_fd;
-    pfd->events = 0;
-    if (ev->ev_events & EV_READ) pfd->events |= POLLIN;
-    if (ev->ev_events & EV_WRITE) pfd->events |= POLLOUT;
-    pfd->revents = 0;
-
-    op->toev[op->cur] = ev;
-    op->cur++;
-
-    return 0;
+       /*
+        * grow the fd -> rtdata array because we have encountered a
+        * new fd too high to fit in the existing array
+        */
+
+       struct rtdata_list **p;
+       struct rtdata_list *datset;
+       int i,x;
+       int newcnt = (newhigh + 1) << 1;
+
+       if (newhigh <= op->highestfd)
+               return (0);
+
+       p = op->fdtodat;
+       p = realloc(op->fdtodat, sizeof(struct rtdata_list *) * newcnt);
+       if (p == NULL)
+               return (-1);
+       op->fdtodat = p;
+
+       datset = calloc(newcnt - (op->highestfd + 1),
+           sizeof(struct rtdata_list));
+       if (datset == NULL)
+               return (-1);
+
+       for (i = op->highestfd + 1, x = 0; i < newcnt; i++, x++) {
+               op->fdtodat[i] = &(datset[x]);
+               TAILQ_INIT(op->fdtodat[i]);
+       }
+
+       op->highestfd = newcnt - 1;
+       return (0);
+}
+
+static struct rtdata *
+ev2dat(struct rtsigop *op, struct event *ev, int create)
+{
+       /*
+        * given an event struct, find the dat structure that
+        * corresponds to it if create is non-zero and the rtdata
+        * structure does not exist, create it return NULL if not
+        * found
+        */
+
+       int found = 0;
+       int fd = ev->ev_fd;
+       struct rtdata *ret = NULL;
+
+       if (op->highestfd < fd && create)
+               if (grow_fdset(op, fd) == -1)
+                       return (NULL);
+  
+       TAILQ_FOREACH(ret, op->fdtodat[fd], next) {
+               if (ret->ev == ev) {
+                       found = 1;
+                       break;
+               }
+       }
+
+       if (!found) {
+               if (!create)
+                       return (NULL);
+
+               ret = calloc(1, sizeof(struct rtdata));
+               if (ret == NULL)
+                       return (NULL);
+               ret->ev = ev;
+               ret->poll_position = -1;
+               TAILQ_INSERT_TAIL(op->fdtodat[fd], ret, next);
+       }
+
+       return (ret);
+}
+
+static void
+dat_del(struct rtsigop *op, struct rtdata *dat)
+{
+       /*
+        * delete our private notes about a given event struct
+        * called from rtsig_del() only
+        */
+       int fd;
+       if (dat == NULL)
+               return;
+       fd = dat->ev->ev_fd;
+
+       TAILQ_REMOVE(op->fdtodat[fd], dat, next);
+       memset(dat, 0, sizeof(*dat));
+       free(dat);
+}
+
+
+static void
+set_sigaction(int sig)
+{
+       /*
+        * set the standard handler for any libevent-managed signal,
+        * including the rtsig used for O_ASYNC notifications
+        */
+       struct sigaction act;
+
+       act.sa_flags = SA_RESTART | SA_SIGINFO;
+       sigfillset(&(act.sa_mask));
+       act.sa_sigaction = &signal_handler;
+       sigaction(sig, &act, NULL);
+}
+
+static int
+find_rt_signal()
+{
+       /* find an unused rtsignal */
+       struct sigaction act;
+       int sig = SIGRTMIN;
+
+       while (sig <= SIGRTMAX) {
+               if (sigaction(sig, NULL, &act) != 0) {
+                       if (errno == EINTR)
+                               continue;
+               } else {
+                       if (act.sa_flags & SA_SIGINFO) {
+                               if (act.sa_sigaction == NULL)
+                                       return (sig);
+                       } else {
+                               if (act.sa_handler == SIG_DFL)
+                                       return (sig);
+                       }
+               }
+               sig++;
+       }
+       return (0);
+}
+
+/*
+ * the next three functions manage our pollset and the memory management for 
+ * fd -> rtdata -> event -> poll_position maps
+ */
+
+static int
+poll_add(struct rtsigop *op, struct event *ev, struct rtdata *dat)
+{
+       struct pollfd *pfd;
+       int newmax = op->max << 1;
+       int pp;
+
+       if (op->poll == NULL)
+               return (0);
+
+       if (dat == NULL)
+               dat = ev2dat(op, ev, 0);
+
+       if (dat == NULL)
+               return (0);
+
+       pp = dat->poll_position;
+
+       if (pp != -1) {
+               pfd = &op->poll[pp];
+               if (ev->ev_events & EV_READ)
+                       pfd->events |= POLLIN;
+    
+               if (ev->ev_events & EV_WRITE)
+                       pfd->events |= POLLOUT;
+    
+               return (0);
+       }
+
+       if (op->cur == op->max) {
+               void *p = realloc(op->poll, sizeof(*op->poll) * newmax);
+               if (p == NULL) {
+                       errno = ENOMEM;
+                       return (-1);
+               }
+               op->poll = p;
+
+               p = realloc(op->ptodat, sizeof(*op->ptodat) * newmax);
+               if (p == NULL) {
+                       /* shrink the pollset back down */
+                       op->poll = realloc(op->poll,
+                           sizeof(*op->poll) * op->max);
+                       errno = ENOMEM;
+                       return (-1);
+               }
+               op->ptodat = p;
+               op->max = newmax;
+       }
+
+       pfd = &op->poll[op->cur];
+       pfd->fd = ev->ev_fd;
+       pfd->revents = 0;
+       pfd->events = 0;
+
+       if (ev->ev_events & EV_READ)
+               pfd->events |= POLLIN;
+  
+       if (ev->ev_events & EV_WRITE)
+               pfd->events |= POLLOUT;
+  
+       op->ptodat[op->cur] = dat;
+       dat->poll_position = op->cur;
+       op->cur++;
+
+       return (0);
 }
 
 static void
 poll_free(struct rtsigop *op, int n)
 {
-    if (op->poll == NULL) return;
-
-    op->cur--;
-    if (n < op->cur) {
-        memcpy(&op->poll[n], &op->poll[op->cur], sizeof(*op->poll));
-        op->toev[n] = op->toev[op->cur];
-    }
-    if (op->max > INIT_MAX && op->cur < op->max >> 1) {
-        op->max >>= 1;
-        op->poll = realloc(op->poll, sizeof(*op->poll) * op->max);
-        op->toev = realloc(op->toev, sizeof(*op->toev) * op->max);
-    }
+  if (op->poll == NULL)
+         return;
+
+  op->cur--;
+
+  if (n < op->cur) {
+    memcpy(&op->poll[n], &op->poll[op->cur], sizeof(*op->poll));
+    op->ptodat[n] = op->ptodat[op->cur];
+    op->ptodat[n]->poll_position = n;
+  }
+
+
+  /* less then half the max in use causes us to shrink */
+  if (op->max > INIT_MAX && op->cur < op->max >> 1) {
+    op->max >>= 1;
+    op->poll = realloc(op->poll, sizeof(*op->poll) * op->max);
+    op->ptodat = realloc(op->ptodat, sizeof(*op->ptodat) * op->max);
+  }
 }
 
 static void
-poll_remove(struct rtsigop *op, struct event *ev)
+poll_remove(struct rtsigop *op, struct event *ev, struct rtdata *dat)
 {
-    int i;
-
-    for (i = 0; i < op->cur; i++) {
-        if (op->toev[i] == ev) {
-            poll_free(op, i);
-            break;
-        }
-    }
+  int pp;
+  if (dat == NULL)
+    dat = ev2dat(op, ev, 0);
+
+  if (dat == NULL) return;
+
+  pp = dat->poll_position;
+  if (pp != -1) {
+    poll_free(op, pp);
+    dat->poll_position = -1;
+  }
 }
 
 static void
 activate(struct event *ev, int flags)
 {
-    if (!(ev->ev_events & EV_PERSIST)) event_del(ev);
-    event_active(ev, flags, 1);
+       /* activate an event, possibly removing one-shot events */
+       if (!(ev->ev_events & EV_PERSIST))
+               event_del(ev);
+       event_active(ev, flags, 1);
 }
 
-void *rtsig_init(void);
-int rtsig_add(void *, struct event *);
-int rtsig_del(void *, struct event *);
-int rtsig_recalc(struct event_base *, void *, int);
-int rtsig_dispatch(struct event_base *, void *, struct timeval *);
-
-struct eventop rtsigops = {
-    "rtsig",
-    rtsig_init,
-    rtsig_add,
-    rtsig_del,
-    rtsig_recalc,
-    rtsig_dispatch
-};
+#define FD_CLOSEONEXEC(x) do { \
+        if (fcntl(x, F_SETFD, 1) == -1) \
+                event_warn("fcntl(%d, F_SETFD)", x); \
+} while (0)
 
 void *
 rtsig_init(void)
 {
        struct rtsigop *op;
+       int sockets[2];
+       int optarg;
+       struct rtdata *dat;
+       int flags;
 
        if (getenv("EVENT_NORTSIG"))
-               return (NULL);
-
-       op = malloc(sizeof(*op));
-       if (op == NULL) return (NULL);
+               goto err;
 
-       memset(op, 0, sizeof(*op));
+       op = calloc(1, sizeof(*op));
+       if (op == NULL)
+               goto err;
 
        op->max = INIT_MAX;
        op->poll = malloc(sizeof(*op->poll) * op->max);
-       if (op->poll == NULL) {
-               free(op);
-               return (NULL);
-       }
-       op->toev = malloc(sizeof(*op->toev) * op->max);
-       if (op->toev == NULL) {
-               free(op->poll);
-               free(op);
-               return (NULL);
-       }
+       if (op->poll == NULL) 
+               goto err_free_op;
+
+       op->signo = find_rt_signal();
+       if (op->signo == 0)
+               goto err_free_poll;
+  
+       op->nonsock = 0;
+
+       if (socketpair(PF_UNIX, SOCK_DGRAM, 0, sockets) != 0)
+               goto err_free_poll;
+
+       FD_CLOSEONEXEC(sockets[0]);
+       FD_CLOSEONEXEC(sockets[1]);
+
+       signal_send_fd[op->signo - 1] = sockets[0];
+       trouble[op->signo - 1] = 0;
+       op->signal_send_fd = sockets[0];
+       op->signal_recv_fd = sockets[1];
+       flags = fcntl(op->signal_recv_fd, F_GETFL);
+       fcntl(op->signal_recv_fd, F_SETFL, flags | O_NONBLOCK);
+
+       optarg = MAXBUFFERSIZE;
+       setsockopt(signal_send_fd[op->signo - 1],
+           SOL_SOCKET, SO_SNDBUF, 
+           &optarg, sizeof(optarg));
+  
+       optarg = MAXBUFFERSIZE;
+       setsockopt(op->signal_recv_fd,
+           SOL_SOCKET, SO_RCVBUF,
+           &optarg, sizeof(optarg));
+
+       op->highestfd = -1;
+       op->fdtodat = NULL;
+       if (grow_fdset(op, 1) == -1)
+               goto err_close_pair;
+
+       op->ptodat = malloc(sizeof(*op->ptodat) * op->max);
+       if (op->ptodat == NULL)
+               goto err_close_pair;
 
        sigemptyset(&op->sigs);
        sigaddset(&op->sigs, SIGIO);
-       sigaddset(&op->sigs, SIGRTMIN);
+       sigaddset(&op->sigs, op->signo);
        sigprocmask(SIG_BLOCK, &op->sigs, NULL);
+       set_sigaction(SIGIO);
+       set_sigaction(op->signo);
+
+       event_set(&(op->sigfdev), op->signal_recv_fd, EV_READ|EV_PERSIST,
+           donothing, NULL);
+       op->sigfdev.ev_flags |= EVLIST_DONTDEL;
+       dat = ev2dat(op, &(op->sigfdev), 1);
+       poll_add(op, &(op->sigfdev), dat);
 
        return (op);
+
+ err_close_pair:
+       close(op->signal_recv_fd);
+       close(signal_send_fd[op->signo - 1]);
+
+ err_free_poll:
+       free(op->poll);
+ err_free_op:
+       free(op);
+ err:
+       return (NULL);
 }
 
 int
@@ -173,79 +635,102 @@ rtsig_add(void *arg, struct event *ev)
 {
        struct rtsigop *op = (struct rtsigop *) arg;
        int flags, i;
-#ifndef HAVE_WORKING_RTSIG
-       struct stat st;
-#endif
+       struct stat statbuf;
+       struct rtdata *dat;
 
        if (ev->ev_events & EV_SIGNAL) {
+               int signo = EVENT_SIGNAL(ev);
+  
                sigaddset(&op->sigs, EVENT_SIGNAL(ev));
-               return sigprocmask(SIG_BLOCK, &op->sigs, NULL);
+               if (sigprocmask(SIG_BLOCK, &op->sigs, NULL) == -1)
+                       return (-1);
+    
+               set_sigaction(signo);
+    
+               signal_send_fd[signo - 1] = op->signal_send_fd;
+               trouble[signo - 1] = 0;
+
+               return (0);
        }
 
-       if (!(ev->ev_events & (EV_READ | EV_WRITE))) return 0;
+       if (!(ev->ev_events & (EV_READ|EV_WRITE))) 
+               return (0);
+
+       if (-1 == fstat(ev->ev_fd, &statbuf))
+               return (-1);
 
-#ifndef HAVE_WORKING_RTSIG
-       if (fstat(ev->ev_fd, &st) == -1) return -1;
-       if (S_ISFIFO(st.st_mode)) {
-               ev->ev_flags |= EVLIST_X_NORT;
-               op->pollmode++;
-       }
-#endif
+       if (!S_ISSOCK(statbuf.st_mode))
+               ev->ev_flags |= EVLIST_NONSOCK;
 
        flags = fcntl(ev->ev_fd, F_GETFL);
        if (flags == -1)
                return (-1);
 
        if (!(flags & O_ASYNC)) {
-               if (fcntl(ev->ev_fd, F_SETSIG, SIGRTMIN) == -1
-                   || fcntl(ev->ev_fd, F_SETOWN, (int) getpid()) == -1)
+               if (fcntl(ev->ev_fd, F_SETSIG, op->signo) == -1 ||
+                   fcntl(ev->ev_fd, F_SETOWN, (int) gettid()) == -1)
                        return (-1);
-
-               if (fcntl(ev->ev_fd, F_SETFL, flags | O_ASYNC))
+    
+               /*
+                * the overhead of always handling writeable edges
+                * isn't going to be that bad...
+                */
+               if (fcntl(ev->ev_fd, F_SETFL, flags | O_ASYNC|O_RDWR)) 
                        return (-1);
        }
 
 #ifdef O_ONESIGFD
+       /*
+        * F_SETAUXFL and O_ONESIGFD are defined in a non-standard
+        * linux kernel patch to coalesce events for fds
+        */
        fcntl(ev->ev_fd, F_SETAUXFL, O_ONESIGFD);
 #endif
 
+       dat = ev2dat(op, ev, 1);
+       if (dat == NULL)
+               return (-1);
+
        op->total++;
-       if (poll_add(op, ev) == -1)
-               goto err;
+       if (ev->ev_flags & EVLIST_NONSOCK)
+               op->nonsock++;
+
+       if (poll_add(op, ev, dat) == -1) {
+               /* must check the level of new fd's */
+               i = errno;
+               fcntl(ev->ev_fd, F_SETFL, flags);
+               errno = i;
+               return (-1);
+       }
 
        return (0);
-
- err:
-       i = errno;
-       fcntl(ev->ev_fd, F_SETFL, flags);
-       errno = i;
-       return (-1);
 }
 
 int
 rtsig_del(void *arg, struct event *ev)
 {
+       struct rtdata *dat;
        struct rtsigop *op = (struct rtsigop *) arg;
 
        if (ev->ev_events & EV_SIGNAL) {
                sigset_t sigs;
 
                sigdelset(&op->sigs, EVENT_SIGNAL(ev));
-
+    
                sigemptyset(&sigs);
                sigaddset(&sigs, EVENT_SIGNAL(ev));
                return (sigprocmask(SIG_UNBLOCK, &sigs, NULL));
        }
 
-       if (!(ev->ev_events & (EV_READ | EV_WRITE)))
+       if (!(ev->ev_events & (EV_READ|EV_WRITE)))
                return (0);
 
-#ifndef HAVE_WORKING_RTSIG
-       if (ev->ev_flags & EVLIST_X_NORT)
-               op->pollmode--;
-#endif
-       poll_remove(op, ev);
+       dat = ev2dat(op, ev, 0);
+       poll_remove(op, ev, dat);
+       dat_del(op, dat);
        op->total--;
+       if (ev->ev_flags & EVLIST_NONSOCK)
+               op->nonsock--;
 
        return (0);
 }
@@ -253,183 +738,248 @@ rtsig_del(void *arg, struct event *ev)
 int
 rtsig_recalc(struct event_base *base, void *arg, int max)
 {
-    return (0);
+       return (0);
 }
 
-int
-rtsig_dispatch(struct event_base *base, void *arg, struct timeval *tv)
+/*
+ * the following do_X functions implement the different stages of a single
+ * eventloop pass: poll(), recv(sigsock), sigtimedwait()
+ *
+ * do_siginfo_dispatch() is a common factor to both do_sigwait() and
+ * do_signals_from_socket().
+ */
+
+static inline int
+do_poll(struct rtsigop *op, struct timespec *ts)
 {
-       struct rtsigop *op = (struct rtsigop *) arg;
-       struct timespec ts;
-       int res, i;
+       int res = 0;
+       int i = 0;
+  
+       if (op->cur > 1) {
+               /* non-empty poll set (modulo the signalfd) */
+               if (op->nonsock) {
+                       int timeout = ts->tv_nsec / 1000000 + ts->tv_sec * 1000;
+                       
+                       sigprocmask(SIG_UNBLOCK, &(op->sigs), NULL);
+
+                       res = poll(op->poll, op->cur, timeout);
+                       
+                       sigprocmask(SIG_BLOCK, &(op->sigs), NULL);
+                       
+                       ts->tv_sec = 0;
+                       ts->tv_nsec = 0;
+               } else {
+                       res = poll(op->poll, op->cur, 0);
+               }
 
-       if (op->poll == NULL)
-               goto retry_poll;
-#ifndef HAVE_WORKING_RTSIG
-       if (op->pollmode)
-               goto poll_all;
-#endif
+               if (res < 0) {
+                       return (errno == EINTR ? 0 : -1);
+               } else if (res) {
+                       ts->tv_sec = 0;
+                       ts->tv_nsec = 0;
+               }
 
-       if (op->cur) {
-               ts.tv_sec = ts.tv_nsec = 0;
-       } else {
-               ts.tv_sec = tv->tv_sec;
-               ts.tv_nsec = tv->tv_usec * 1000;
+               i = 0;
+               while (i < op->cur) {
+                       struct rtdata *dat = op->ptodat[i];
+                       struct event *ev = dat->ev;
+
+                       if (op->poll[i].revents) {
+                               int flags = 0;
+       
+                               if (op->poll[i].revents & (POLLIN | POLLERR))
+                                       flags |= EV_READ;
+       
+                               if (op->poll[i].revents & POLLOUT)
+                                       flags |= EV_WRITE;
+       
+                               if (!(ev->ev_events & EV_PERSIST)) {
+                                       poll_remove(op, ev, op->ptodat[i]);
+                                       event_del(ev);
+                               } else {
+                                       i++;
+                               }
+       
+                               event_active(ev, flags, 1);
+                       } else {
+                               if (ev->ev_flags & (EVLIST_NONSOCK|EVLIST_DONTDEL)) {
+                                       i++;
+                               } else {
+                                       poll_remove(op, ev, op->ptodat[i]);
+                               }
+                       }
+               }
        }
+       return (res);
+}
 
-       for (;;) {
-               siginfo_t info;
-               struct event *ev;
-               int signum;
+static inline int
+do_siginfo_dispatch(struct event_base *base, struct rtsigop *op,
+    siginfo_t *info)
+{
+       int signum;
+       struct rtdata *dat, *next_dat;
+       struct event *ev, *next_ev;
 
-               signum = sigtimedwait(&op->sigs, &info, &ts);
+       if (info == NULL)
+               return (-1);
 
-               if (signum == -1) {
-                       if (errno == EAGAIN)
-                               break;
-                       return (errno == EINTR ? 0 : -1);
-               }
+       signum = info->si_signo;
+       if (signum == op->signo) {
+               int flags, sigok = 0;
+               flags = 0;
 
-               ts.tv_sec = ts.tv_nsec = 0;
+               if (info->si_band & (POLLIN|POLLERR))
+                       flags |= EV_READ;
+               if (info->si_band & POLLOUT)
+                       flags |= EV_WRITE;
 
-               if (signum == SIGIO) {
-#ifndef HAVE_WORKING_RTSIG
-               poll_all:
-#endif
-                       free(op->poll);
-                       free(op->toev);
-               retry_poll:
-                       op->cur = 0;
-                       op->max = op->total;
-                       op->poll = malloc(sizeof(*op->poll) * op->total);
-                       if (op->poll == NULL)
-                               return (-1);
-                       op->toev = malloc(sizeof(*op->toev) * op->total);
-                       if (op->toev == NULL) {
-                               free(op->poll);
-                               op->poll = NULL;
-                               return (-1);
-                       }
+               if (!flags)
+                       return (0);
 
-                       TAILQ_FOREACH(ev, &base->eventqueue, ev_next)
-                           if (!(ev->ev_flags & EVLIST_X_NORT))
-                                   poll_add(op, ev);
+               if (info->si_fd > op->highestfd)
+                       return (-1);
 
-                       break;
+               dat = TAILQ_FIRST(op->fdtodat[info->si_fd]);
+               while (dat != TAILQ_END(op->fdtodat[info->si_fd])) {
+                       next_dat = TAILQ_NEXT(dat, next);
+                       if (flags & dat->ev->ev_events) {
+                               ev = dat->ev;
+                               poll_add(op, ev, dat);
+                               activate(ev, flags & ev->ev_events);
+                               sigok = 1;
+                       }
+                       dat = next_dat;
                }
-
-               if (signum == SIGRTMIN) {
-                       int flags, i, sigok = 0;
-
-                       if (info.si_band <= 0) { /* SI_SIGIO */
-                               flags = EV_READ | EV_WRITE;
+       } else if (signum == SIGIO) {
+               TAILQ_FOREACH(ev, &base->eventqueue, ev_next) {
+                       if (ev->ev_events & (EV_READ|EV_WRITE))
+                               poll_add(op, ev, NULL);
+               }
+               return (1); /* 1 means the caller should poll() again */
+    
+       } else if (sigismember(&op->sigs, signum)) {
+               /* managed signals are queued */
+               ev = TAILQ_FIRST(&signalqueue);
+               while (ev != TAILQ_END(&signalqueue)) {
+                       next_ev = TAILQ_NEXT(ev, ev_signal_next);
+                       if (EVENT_SIGNAL(ev) == signum)
+                               activate(ev, EV_SIGNAL);
+                       ev = next_ev;
+               }
+       } else {
+               /* dispatch unmanaged signals immediately */
+               struct sigaction sa;
+               if (sigaction(signum, NULL, &sa) == 0) {
+                       if ((sa.sa_flags & SA_SIGINFO) && sa.sa_sigaction) {
+                               (*sa.sa_sigaction)(signum, info, NULL);
+                       } else if (sa.sa_handler) {
+                               if ((int)sa.sa_handler != 1)
+                                       (*sa.sa_handler)(signum);
                        } else {
-                               flags = 0;
-                               if (info.si_band & POLLIN) flags |= EV_READ;
-                               if (info.si_band & POLLOUT) flags |= EV_WRITE;
-                               if (!flags) continue;
+                               if (signum != SIGCHLD) {
+                                       /* non-blocked SIG_DFL */
+                                       kill(gettid(), signum);
+                               }
                        }
+               }
+       }
 
-                       for (i = 0; flags && i < op->cur; i++) {
-                               ev = op->toev[i];
+       return (0);
+}
 
-                               if (ev->ev_fd == info.si_fd) {
-                                       flags &= ~ev->ev_events;
-                                       sigok = 1;
-                               }
-                       }
+/*
+ * return 1 if we should poll again
+ * return 0 if we are all set
+ * return -1 on error
+ */
+static inline int
+do_sigwait(struct event_base *base, struct rtsigop *op, struct timespec *ts,
+    sigset_t *sigs)
+{
+       for (;;) {
+               siginfo_t info;
+               int signum;
 
-                       for (ev = TAILQ_FIRST(&base->eventqueue);
-                           flags && ev != TAILQ_END(&base->eventqueue);
-                           ev = TAILQ_NEXT(ev, ev_next)) {
-                               if (ev->ev_fd == info.si_fd) {
-                                       if (flags & ev->ev_events) {
-                                               i = poll_add(op, ev);
-                                               if (i == -1) return -1;
-                                               flags &= ~ev->ev_events;
-                                       }
-                                       sigok = 1;
-                               }
-                       }
+               signum = sigtimedwait(sigs, &info, ts);
 
-                       if (!sigok) {
-                               flags = fcntl(info.si_fd, F_GETFL);
-                               if (flags == -1) return -1;
-                               fcntl(info.si_fd, F_SETFL, flags & ~O_ASYNC);
-                       }
-               } else {
-                       TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) {
-                               if (EVENT_SIGNAL(ev) == signum)
-                                       activate(ev, EV_SIGNAL);
-                       }
+               ts->tv_sec = 0;
+               ts->tv_nsec = 0;
+
+               if (signum == -1) {
+                       if (errno == EAGAIN || errno == EINTR)
+                               return (0);
+                       return (-1);
+               } else if (1 == do_siginfo_dispatch(base, op, &info)) {
+                       return (1);
                }
        }
 
-       if (!op->cur)
-               return (0);
+       /* NOTREACHED */
+}
 
-       res = poll(op->poll, op->cur, tv->tv_sec * 1000 + 
-                                     (tv->tv_usec + 999) / 1000);
-       if (res < 0)
-               return (-1);
+static inline int
+do_signals_from_socket(struct event_base *base, struct rtsigop *op,
+    struct timespec *ts)
+{
+       int fd = op->signal_recv_fd;
+       siginfo_t info;
+       int res;
 
-       i = 0;
-#ifdef HAVE_WORKING_RTSIG
-       while (i < res) {
-#else
-       while (i < op->cur) {
-#endif
-               if (op->poll[i].revents) {
-                       int flags = 0;
-                       struct event *ev = op->toev[i];
-
-                       if (op->poll[i].revents & POLLIN)
-                               flags |= EV_READ;
-                       if (op->poll[i].revents & POLLOUT)
-                               flags |= EV_WRITE;
-
-                       if (!(ev->ev_events & EV_PERSIST)) {
-                               event_del(ev);
-                               res--;
-                       } else {
-                               i++;
-                       }
-                       event_active(ev, flags, 1);
-               } else {
-#ifndef HAVE_WORKING_RTSIG
-                       if (op->toev[i]->ev_flags & EVLIST_X_NORT) {
-                               i++;
-                               res++;
+       for (;;) {
+               res = recv(fd, &info, sizeof(info), MSG_NOSIGNAL);
+               if (res == -1) {
+                       if (errno == EAGAIN)
+                               return (0);
+                       if (errno == EINTR)
                                continue;
-                       }
-#endif
-                       for (;;) {
-                               op->cur--;
-                               if (i == op->cur)
-                                       break;
-                               if (op->poll[op->cur].revents) {
-                                       memcpy(&op->poll[i], &op->poll[op->cur], sizeof(*op->poll));
-                                       op->toev[i] = op->toev[op->cur];
-                                       break;
-                               }
-                       }
+                       return (-1);
+               } else {
+                       ts->tv_sec = 0;
+                       ts->tv_nsec = 0;
+                       if (1 == do_siginfo_dispatch(base, op, &info))
+                               return (1);
                }
        }
-#ifdef HAVE_WORKING_RTSIG
-       op->cur = res;
-#endif
+       /* NOTREACHED */
+}
 
-       if (!op->cur) {
-               op->max = INIT_MAX;
-               free(op->poll);
-               free(op->toev);
-               /* We just freed it, we shouldn't have a problem getting it back. */
-               op->poll = malloc(sizeof(*op->poll) * op->max);
-               op->toev = malloc(sizeof(*op->toev) * op->max);
+int
+rtsig_dispatch(struct event_base *base, void *arg, struct timeval *tv)
+{
+       struct rtsigop *op = (struct rtsigop *) arg;
+       struct timespec ts;
+       int res;
+       sigset_t sigs;
 
-               if (op->poll == NULL || op->toev == NULL)
-                       event_err(1, "%s: malloc");
-       }
+       ts.tv_sec = tv->tv_sec;
+       ts.tv_nsec = tv->tv_usec * 1000;
+
+ poll_for_level:
+       res = do_poll(op, &ts); /* ts can be modified in do_XXX() */
+
+       res = do_signals_from_socket(base, op, &ts);
+       if (res == 1)
+               goto poll_for_level;
+       else if (res == -1)
+               return (-1);
+
+       /*
+        * the mask = managed_signals | unblocked-signals
+        * MM - if this is not blocking do we need to cast the net this wide?
+        */
+       sigemptyset(&sigs);
+       sigprocmask(SIG_BLOCK, &sigs, &sigs);
+       signotset(&sigs);
+       sigorset(&sigs, &sigs, &op->sigs);
+
+       res = do_sigwait(base, op, &ts, &sigs);
+
+       if (res == 1)
+               goto poll_for_level;
+       else if (res == -1)
+               return (-1);
 
        return (0);
 }
+
index f43e70c9f86c1ee62545997f1c6bfa7a4357d94a..3f5158385c31cdc8c8c57a910a7233ad83a03bb6 100644 (file)
@@ -754,16 +754,21 @@ evtag_fuzz()
        struct timeval tv;
        int i, j;
 
+       int not_failed = 0;
        for (j = 0; j < 100; j++) {
                for (i = 0; i < sizeof(buffer); i++)
                        buffer[i] = rand();
                evbuffer_drain(tmp, -1);
                evbuffer_add(tmp, buffer, sizeof(buffer));
 
-               if (evtag_unmarshal_timeval(tmp, 0, &tv) != -1) {
-                       fprintf(stderr, "evtag_unmarshal should have failed");
-                       exit(1);
-               }
+               if (evtag_unmarshal_timeval(tmp, 0, &tv) != -1)
+                       not_failed++;
+       }
+
+       /* The majority of decodes should fail */
+       if (not_failed >= 10) {
+               fprintf(stderr, "evtag_unmarshal should have failed");
+               exit(1);
        }
 
        /* Now insert some corruption into the tag length field */