From fbf01c7f044f143046df7b9c566b5f49770ba0cb Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Sun, 4 Apr 2004 02:20:21 +0000 Subject: [PATCH] support for low and high watermarks svn:r101 --- buffer.c | 22 +++++++++++++++++- evbuffer.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- event.c | 8 ++++++- event.h | 13 ++++++++++- 4 files changed, 103 insertions(+), 5 deletions(-) diff --git a/buffer.c b/buffer.c index ad608b77..d8bf08fd 100644 --- a/buffer.c +++ b/buffer.c @@ -88,6 +88,7 @@ int evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen) { size_t need = buf->off + datlen; + size_t oldoff = buf->off; if (buf->totallen < need) { void *newbuf; @@ -108,19 +109,30 @@ evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen) memcpy(buf->buffer + buf->off, data, datlen); buf->off += datlen; + if (datlen && buf->cb != NULL) + (*buf->cb)(buf, oldoff, buf->off, buf->cbarg); + return (0); } void evbuffer_drain(struct evbuffer *buf, size_t len) { + size_t oldoff = buf->off; + if (len >= buf->off) { buf->off = 0; - return; + goto done; } memmove(buf->buffer, buf->buffer + len, buf->off - len); buf->off -= len; + + done: + /* Tell someone about changes in this buffer */ + if (buf->off != oldoff && buf->cb != NULL) + (*buf->cb)(buf, oldoff, buf->off, buf->cbarg); + } int @@ -176,3 +188,11 @@ evbuffer_find(struct evbuffer *buffer, u_char *what, size_t len) return (NULL); } + +void evbuffer_setcb(struct evbuffer *buffer, + void (*cb)(struct evbuffer *, size_t, size_t, void *), + void *cbarg) +{ + buffer->cb = cb; + buffer->cbarg = cbarg; +} diff --git a/evbuffer.c b/evbuffer.c index d765ce5f..b476b5e2 100644 --- a/evbuffer.c +++ b/evbuffer.c @@ -41,12 +41,34 @@ bufferevent_add(struct event *ev, int timeout) return (event_add(ev, ptv)); } +/* + * This callback is executed when the size of the input buffer changes. + * We use it to apply back pressure on the reading side. + */ + +void +bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, + void *arg) { + struct bufferevent *bufev = arg; + /* + * If we are below the watermak then reschedule reading if it's + * still enabled. + */ + if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { + evbuffer_setcb(buf, NULL, NULL); + + if (bufev->enabled & EV_READ) + bufferevent_add(&bufev->ev_read, bufev->timeout_read); + } +} + static void bufferevent_readcb(int fd, short event, void *arg) { struct bufferevent *bufev = arg; int res = 0; short what = EVBUFFER_READ; + size_t len; if (event == EV_TIMEOUT) { what |= EVBUFFER_TIMEOUT; @@ -69,6 +91,19 @@ bufferevent_readcb(int fd, short event, void *arg) bufferevent_add(&bufev->ev_read, bufev->timeout_read); + /* See if this callbacks meets the water marks */ + len = EVBUFFER_LENGTH(bufev->input); + if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) + return; + if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { + struct evbuffer *buf = bufev->input; + event_del(&bufev->ev_read); + + /* Now schedule a callback for us */ + evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); + return; + } + /* Invoke the user callback - must always be called last */ (*bufev->readcb)(bufev, bufev->cbarg); return; @@ -111,8 +146,11 @@ bufferevent_writecb(int fd, short event, void *arg) if (EVBUFFER_LENGTH(bufev->output) != 0) bufferevent_add(&bufev->ev_write, bufev->timeout_write); - /* Invoke the user callback if our buffer is drained */ - if (EVBUFFER_LENGTH(bufev->output) == 0) + /* + * Invoke the user callback if our buffer is drained or below the + * low watermark. + */ + if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) (*bufev->writecb)(bufev, bufev->cbarg); return; @@ -273,3 +311,26 @@ bufferevent_settimeout(struct bufferevent *bufev, bufev->timeout_read = timeout_read; bufev->timeout_write = timeout_write; } + +/* + * Sets the water marks + */ + +void +bufferevent_setwatermark(struct bufferevent *bufev, short events, + size_t lowmark, size_t highmark) +{ + if (events & EV_READ) { + bufev->wm_read.low = lowmark; + bufev->wm_read.high = highmark; + } + + if (events & EV_WRITE) { + bufev->wm_write.low = lowmark; + bufev->wm_write.high = highmark; + } + + /* If the watermarks changed then see if we should call read again */ + bufferevent_read_pressure_cb(bufev->input, + 0, EVBUFFER_LENGTH(bufev->input), bufev); +} diff --git a/event.c b/event.c index 86a17e6d..92eadde7 100644 --- a/event.c +++ b/event.c @@ -327,6 +327,10 @@ event_once(int fd, short events, struct event_once *eonce; struct timeval etv; + /* We cannot support signals that just fire once */ + if (events & EV_SIGNAL) + return (-1); + if ((eonce = calloc(1, sizeof(struct event_once))) == NULL) return (-1); @@ -387,8 +391,10 @@ event_pending(struct event *ev, short event, struct timeval *tv) flags |= ev->ev_res; if (ev->ev_flags & EVLIST_TIMEOUT) flags |= EV_TIMEOUT; + if (ev->ev_flags & EVLIST_SIGNAL) + flags |= EV_SIGNAL; - event &= (EV_TIMEOUT|EV_READ|EV_WRITE); + event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL); /* See if there is a timeout that we should report */ if (tv != NULL && (flags & event & EV_TIMEOUT)) diff --git a/event.h b/event.h index dd03c538..eddb0e96 100644 --- a/event.h +++ b/event.h @@ -173,6 +173,9 @@ struct evbuffer { size_t totallen; size_t off; + + void (*cb)(struct evbuffer *, size_t, size_t, void *); + void *cbarg; }; /* Just for error reporting - use other constants otherwise */ @@ -186,6 +189,11 @@ struct bufferevent; typedef void (*evbuffercb)(struct bufferevent *, void *); typedef void (*everrorcb)(struct bufferevent *, short what, void *); +struct event_watermark { + size_t low; + size_t high; +}; + struct bufferevent { struct event ev_read; struct event ev_write; @@ -193,6 +201,9 @@ struct bufferevent { struct evbuffer *input; struct evbuffer *output; + struct event_watermark wm_read; + struct event_watermark wm_write; + evbuffercb readcb; evbuffercb writecb; everrorcb errorcb; @@ -222,7 +233,6 @@ void bufferevent_settimeout(struct bufferevent *bufev, struct evbuffer *evbuffer_new(void); void evbuffer_free(struct evbuffer *); -void evbuffer_free(struct evbuffer *); int evbuffer_add(struct evbuffer *, u_char *, size_t); int evbuffer_add_buffer(struct evbuffer *, struct evbuffer *); int evbuffer_add_printf(struct evbuffer *, char *fmt, ...); @@ -230,6 +240,7 @@ void evbuffer_drain(struct evbuffer *, size_t); int evbuffer_write(struct evbuffer *, int); int evbuffer_read(struct evbuffer *, int, int); u_char *evbuffer_find(struct evbuffer *, u_char *, size_t); +void evbuffer_setcb(struct evbuffer *, void (*)(struct evbuffer *, size_t, size_t, void *), void *); #ifdef __cplusplus } -- 2.50.1