]> granicus.if.org Git - libevent/commitdiff
Add event_config function to limit time/callbacks between calls to dispatch
authorNick Mathewson <nickm@torproject.org>
Thu, 2 Dec 2010 01:44:05 +0000 (20:44 -0500)
committerNick Mathewson <nickm@torproject.org>
Thu, 18 Aug 2011 02:03:57 +0000 (22:03 -0400)
event-internal.h
event.c
include/event2/event.h

index 0fc3216bf9bd0fe5170e46330acaa971cf83b928..f495d6d2ad254f6bba27585699321f493f58c75d 100644 (file)
@@ -273,6 +273,9 @@ struct event_base {
        /** Flags that this base was configured with */
        enum event_base_config_flag flags;
 
+       struct timeval max_dispatch_time;
+       int max_dispatch_callbacks;
+
        /* Notify main thread to wake up break, etc. */
        /** True if the base already has a pending notify, and we don't need
         * to add any more. */
@@ -299,6 +302,8 @@ struct event_config {
        TAILQ_HEAD(event_configq, event_config_entry) entries;
 
        int n_cpus_hint;
+       struct timeval max_dispatch_interval;
+       int max_dispatch_callbacks;
        enum event_method_feature require_features;
        enum event_base_config_flag flags;
 };
diff --git a/event.c b/event.c
index 667ed48bb8e1bf208606960489e229c2f151ab26..4b0c5d983240fb98050f490fb239cecc8cc909b3 100644 (file)
--- a/event.c
+++ b/event.c
@@ -54,6 +54,7 @@
 #include <signal.h>
 #include <string.h>
 #include <time.h>
+#include <limits.h>
 
 #include "event2/event.h"
 #include "event2/event_struct.h"
@@ -586,6 +587,17 @@ event_base_new_with_config(const struct event_config *cfg)
        should_check_environment =
            !(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
 
+       if (cfg)
+               memcpy(&base->max_dispatch_time,
+                   &cfg->max_dispatch_interval, sizeof(struct timeval));
+       else
+               base->max_dispatch_time.tv_sec = -1;
+       if (cfg && cfg->max_dispatch_callbacks >= 0) {
+               base->max_dispatch_callbacks = cfg->max_dispatch_callbacks;
+       } else {
+               base->max_dispatch_callbacks = INT_MAX;
+       }
+
        for (i = 0; eventops[i] && !base->evbase; i++) {
                if (cfg != NULL) {
                        /* determine if this backend should be avoided */
@@ -910,6 +922,8 @@ event_config_new(void)
                return (NULL);
 
        TAILQ_INIT(&cfg->entries);
+       cfg->max_dispatch_interval.tv_sec = -1;
+       cfg->max_dispatch_callbacks = -1;
 
        return (cfg);
 }
@@ -979,6 +993,19 @@ event_config_set_num_cpus_hint(struct event_config *cfg, int cpus)
        return (0);
 }
 
+int
+event_config_set_max_dispatch_interval(struct event_config *cfg,
+    const struct timeval *max_interval, int max_callbacks)
+{
+       if (max_interval)
+               memcpy(&cfg->max_dispatch_interval, max_interval,
+                   sizeof(struct timeval));
+       else
+               cfg->max_dispatch_interval.tv_sec = -1;
+       cfg->max_dispatch_callbacks = max_callbacks;
+       return (0);
+}
+
 int
 event_priority_init(int npriorities)
 {
@@ -1279,7 +1306,8 @@ event_persist_closure(struct event_base *base, struct event *ev)
 */
 static int
 event_process_active_single_queue(struct event_base *base,
-    struct event_list *activeq)
+    struct event_list *activeq,
+    int max_to_process, const struct timeval *endtime)
 {
        struct event *ev;
        int count = 0;
@@ -1332,6 +1360,14 @@ event_process_active_single_queue(struct event_base *base,
 
                if (base->event_break)
                        return -1;
+               if (count >= max_to_process)
+                       return count;
+               if (count && endtime) {
+                       struct timeval now;
+                       evutil_gettimeofday(&now, NULL);
+                       if (evutil_timercmp(&now, endtime, >=))
+                               return count;
+               }
        }
        return count;
 }
@@ -1343,12 +1379,16 @@ event_process_active_single_queue(struct event_base *base,
    we process.
  */
 static int
-event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
+event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr,
+    int max_to_process, const struct timeval *endtime)
 {
        int count = 0;
        struct deferred_cb *cb;
-
 #define MAX_DEFERRED 16
+       if (max_to_process > MAX_DEFERRED)
+               max_to_process = MAX_DEFERRED;
+#undef MAX_DEFERRED
+
        while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
                cb->queued = 0;
                TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
@@ -1360,10 +1400,15 @@ event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
                LOCK_DEFERRED_QUEUE(queue);
                if (*breakptr)
                        return -1;
-               if (++count == MAX_DEFERRED)
+               if (++count >= max_to_process)
                        break;
+               if (endtime) {
+                       struct timeval now;
+                       evutil_gettimeofday(&now, NULL);
+                       if (evutil_timercmp(&now, endtime, >=))
+                               return count;
+               }
        }
-#undef MAX_DEFERRED
        return count;
 }
 
@@ -1379,11 +1424,22 @@ event_process_active(struct event_base *base)
        /* Caller must hold th_base_lock */
        struct event_list *activeq = NULL;
        int i, c = 0;
+       const struct timeval *endtime;
+       struct timeval tv;
+       const int maxcb = base->max_dispatch_callbacks;
+       if (base->max_dispatch_time.tv_sec >= 0) {
+               evutil_gettimeofday(&tv, NULL);
+               evutil_timeradd(&base->max_dispatch_time, &tv, &tv);
+               endtime = &tv;
+       } else {
+               endtime = NULL;
+       }
 
        for (i = 0; i < base->nactivequeues; ++i) {
                if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
                        activeq = &base->activequeues[i];
-                       c = event_process_active_single_queue(base, activeq);
+                       c = event_process_active_single_queue(base, activeq,
+                           maxcb, endtime);
                        if (c < 0)
                                return -1;
                        else if (c > 0)
@@ -1394,7 +1450,8 @@ event_process_active(struct event_base *base)
                }
        }
 
-       event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
+       event_process_deferred_callbacks(&base->defer_queue,&base->event_break,
+           maxcb-c, endtime);
        return c;
 }
 
index c487f7808888a542d4019d9a5079ed19b1a4799a..7c2ab600e4cf74621bcafeac5135fad4883f9fe0 100644 (file)
@@ -536,6 +536,24 @@ int event_config_set_flag(struct event_config *cfg, int flag);
  */
 int event_config_set_num_cpus_hint(struct event_config *cfg, int cpus);
 
+/**
+ * Record an interval and/or a number of callbacks after which the event base
+ * should check for new events.  By default, the event base will run as many
+ * events are as activated at the higest activated priority before checking
+ * for new events.  If you configure it by setting max_interval, it will check
+ * the time after each callback, and not allow more than max_interval to
+ * elapse before checking for new events.  If you configure it by setting
+ * max_callbacks to a value >= 0, it will run no more than max_callbacks
+ * callbacks before checking for new events.
+ *
+ * This option can decrease the latency of high-priority events, and
+ * avoid priority inversions where multiple low-priority events keep us from
+ * polling for high-priority events, but at the expense of slightly decreasing
+ * the throughput.  Use it with caution!
+ **/
+int event_config_set_max_dispatch_interval(struct event_config *cfg,
+    const struct timeval *max_interval, int max_callbacks);
+
 /**
   Initialize the event API.