]> granicus.if.org Git - libevent/commitdiff
support for low and high watermarks
authorNiels Provos <provos@gmail.com>
Sun, 4 Apr 2004 02:20:21 +0000 (02:20 +0000)
committerNiels Provos <provos@gmail.com>
Sun, 4 Apr 2004 02:20:21 +0000 (02:20 +0000)
svn:r101

buffer.c
evbuffer.c
event.c
event.h

index ad608b77a0dca44d0541e8fa3266e717e4292442..d8bf08fdfa51f09167f922f2e9df93dfd0fe0a72 100644 (file)
--- a/buffer.c
+++ b/buffer.c
@@ -88,6 +88,7 @@ int
 evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen)
 {
        size_t need = buf->off + datlen;
+       size_t oldoff = buf->off;
 
        if (buf->totallen < need) {
                void *newbuf;
@@ -108,19 +109,30 @@ evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen)
        memcpy(buf->buffer + buf->off, data, datlen);
        buf->off += datlen;
 
+       if (datlen && buf->cb != NULL)
+               (*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+
        return (0);
 }
 
 void
 evbuffer_drain(struct evbuffer *buf, size_t len)
 {
+       size_t oldoff = buf->off;
+
        if (len >= buf->off) {
                buf->off = 0;
-               return;
+               goto done;
        }
 
        memmove(buf->buffer, buf->buffer + len, buf->off - len);
        buf->off -= len;
+
+ done:
+       /* Tell someone about changes in this buffer */
+       if (buf->off != oldoff && buf->cb != NULL)
+               (*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+
 }
 
 int
@@ -176,3 +188,11 @@ evbuffer_find(struct evbuffer *buffer, u_char *what, size_t len)
 
        return (NULL);
 }
+
+void evbuffer_setcb(struct evbuffer *buffer,
+    void (*cb)(struct evbuffer *, size_t, size_t, void *),
+    void *cbarg)
+{
+       buffer->cb = cb;
+       buffer->cbarg = cbarg;
+}
index d765ce5fed86ce824fa7f452803c000eebb52a46..b476b5e2d2a98c0753aad04394c81f75f62f924f 100644 (file)
@@ -41,12 +41,34 @@ bufferevent_add(struct event *ev, int timeout)
        return (event_add(ev, ptv));
 }
 
+/* 
+ * This callback is executed when the size of the input buffer changes.
+ * We use it to apply back pressure on the reading side.
+ */
+
+void
+bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
+    void *arg) {
+       struct bufferevent *bufev = arg;
+       /* 
+        * If we are below the watermak then reschedule reading if it's
+        * still enabled.
+        */
+       if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
+               evbuffer_setcb(buf, NULL, NULL);
+
+               if (bufev->enabled & EV_READ)
+                       bufferevent_add(&bufev->ev_read, bufev->timeout_read);
+       }
+}
+
 static void
 bufferevent_readcb(int fd, short event, void *arg)
 {
        struct bufferevent *bufev = arg;
        int res = 0;
        short what = EVBUFFER_READ;
+       size_t len;
 
        if (event == EV_TIMEOUT) {
                what |= EVBUFFER_TIMEOUT;
@@ -69,6 +91,19 @@ bufferevent_readcb(int fd, short event, void *arg)
 
        bufferevent_add(&bufev->ev_read, bufev->timeout_read);
 
+       /* See if this callbacks meets the water marks */
+       len = EVBUFFER_LENGTH(bufev->input);
+       if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
+               return;
+       if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
+               struct evbuffer *buf = bufev->input;
+               event_del(&bufev->ev_read);
+
+               /* Now schedule a callback for us */
+               evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
+               return;
+       }
+
        /* Invoke the user callback - must always be called last */
        (*bufev->readcb)(bufev, bufev->cbarg);
        return;
@@ -111,8 +146,11 @@ bufferevent_writecb(int fd, short event, void *arg)
        if (EVBUFFER_LENGTH(bufev->output) != 0)
                bufferevent_add(&bufev->ev_write, bufev->timeout_write);
 
-       /* Invoke the user callback if our buffer is drained */
-       if (EVBUFFER_LENGTH(bufev->output) == 0)
+       /*
+        * Invoke the user callback if our buffer is drained or below the
+        * low watermark.
+        */
+       if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
                (*bufev->writecb)(bufev, bufev->cbarg);
 
        return;
@@ -273,3 +311,26 @@ bufferevent_settimeout(struct bufferevent *bufev,
        bufev->timeout_read = timeout_read;
        bufev->timeout_write = timeout_write;
 }
+
+/*
+ * Sets the water marks
+ */
+
+void
+bufferevent_setwatermark(struct bufferevent *bufev, short events,
+    size_t lowmark, size_t highmark)
+{
+       if (events & EV_READ) {
+               bufev->wm_read.low = lowmark;
+               bufev->wm_read.high = highmark;
+       }
+
+       if (events & EV_WRITE) {
+               bufev->wm_write.low = lowmark;
+               bufev->wm_write.high = highmark;
+       }
+
+       /* If the watermarks changed then see if we should call read again */
+       bufferevent_read_pressure_cb(bufev->input,
+           0, EVBUFFER_LENGTH(bufev->input), bufev);
+}
diff --git a/event.c b/event.c
index 86a17e6d46a5fa951a9274ed92c7f398c5f40fb2..92eadde76d51bdd9f6b78799aaf8edb8ece71a15 100644 (file)
--- a/event.c
+++ b/event.c
@@ -327,6 +327,10 @@ event_once(int fd, short events,
        struct event_once *eonce;
        struct timeval etv;
 
+       /* We cannot support signals that just fire once */
+       if (events & EV_SIGNAL)
+               return (-1);
+
        if ((eonce = calloc(1, sizeof(struct event_once))) == NULL)
                return (-1);
 
@@ -387,8 +391,10 @@ event_pending(struct event *ev, short event, struct timeval *tv)
                flags |= ev->ev_res;
        if (ev->ev_flags & EVLIST_TIMEOUT)
                flags |= EV_TIMEOUT;
+       if (ev->ev_flags & EVLIST_SIGNAL)
+               flags |= EV_SIGNAL;
 
-       event &= (EV_TIMEOUT|EV_READ|EV_WRITE);
+       event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL);
 
        /* See if there is a timeout that we should report */
        if (tv != NULL && (flags & event & EV_TIMEOUT))
diff --git a/event.h b/event.h
index dd03c538d0e056d60d4cde6d9dc28cd4be91eb66..eddb0e96b58b625dcef44a2201bf816ed45c7803 100644 (file)
--- a/event.h
+++ b/event.h
@@ -173,6 +173,9 @@ struct evbuffer {
 
        size_t totallen;
        size_t off;
+
+       void (*cb)(struct evbuffer *, size_t, size_t, void *);
+       void *cbarg;
 };
 
 /* Just for error reporting - use other constants otherwise */
@@ -186,6 +189,11 @@ struct bufferevent;
 typedef void (*evbuffercb)(struct bufferevent *, void *);
 typedef void (*everrorcb)(struct bufferevent *, short what, void *);
 
+struct event_watermark {
+       size_t low;
+       size_t high;
+};
+
 struct bufferevent {
        struct event ev_read;
        struct event ev_write;
@@ -193,6 +201,9 @@ struct bufferevent {
        struct evbuffer *input;
        struct evbuffer *output;
 
+       struct event_watermark wm_read;
+       struct event_watermark wm_write;
+
        evbuffercb readcb;
        evbuffercb writecb;
        everrorcb errorcb;
@@ -222,7 +233,6 @@ void bufferevent_settimeout(struct bufferevent *bufev,
 
 struct evbuffer *evbuffer_new(void);
 void evbuffer_free(struct evbuffer *);
-void evbuffer_free(struct evbuffer *);
 int evbuffer_add(struct evbuffer *, u_char *, size_t);
 int evbuffer_add_buffer(struct evbuffer *, struct evbuffer *);
 int evbuffer_add_printf(struct evbuffer *, char *fmt, ...);
@@ -230,6 +240,7 @@ void evbuffer_drain(struct evbuffer *, size_t);
 int evbuffer_write(struct evbuffer *, int);
 int evbuffer_read(struct evbuffer *, int, int);
 u_char *evbuffer_find(struct evbuffer *, u_char *, size_t);
+void evbuffer_setcb(struct evbuffer *, void (*)(struct evbuffer *, size_t, size_t, void *), void *);
 
 #ifdef __cplusplus
 }