evbuffer_add(struct evbuffer *buf, u_char *data, size_t datlen)
{
size_t need = buf->off + datlen;
+ size_t oldoff = buf->off;
if (buf->totallen < need) {
void *newbuf;
memcpy(buf->buffer + buf->off, data, datlen);
buf->off += datlen;
+ if (datlen && buf->cb != NULL)
+ (*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+
return (0);
}
void
evbuffer_drain(struct evbuffer *buf, size_t len)
{
+ size_t oldoff = buf->off;
+
if (len >= buf->off) {
buf->off = 0;
- return;
+ goto done;
}
memmove(buf->buffer, buf->buffer + len, buf->off - len);
buf->off -= len;
+
+ done:
+ /* Tell someone about changes in this buffer */
+ if (buf->off != oldoff && buf->cb != NULL)
+ (*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+
}
int
return (NULL);
}
+
+void evbuffer_setcb(struct evbuffer *buffer,
+ void (*cb)(struct evbuffer *, size_t, size_t, void *),
+ void *cbarg)
+{
+ buffer->cb = cb;
+ buffer->cbarg = cbarg;
+}
return (event_add(ev, ptv));
}
+/*
+ * This callback is executed when the size of the input buffer changes.
+ * We use it to apply back pressure on the reading side.
+ */
+
+void
+bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
+ void *arg) {
+ struct bufferevent *bufev = arg;
+ /*
+ * If we are below the watermak then reschedule reading if it's
+ * still enabled.
+ */
+ if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
+ evbuffer_setcb(buf, NULL, NULL);
+
+ if (bufev->enabled & EV_READ)
+ bufferevent_add(&bufev->ev_read, bufev->timeout_read);
+ }
+}
+
static void
bufferevent_readcb(int fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
int res = 0;
short what = EVBUFFER_READ;
+ size_t len;
if (event == EV_TIMEOUT) {
what |= EVBUFFER_TIMEOUT;
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
+ /* See if this callbacks meets the water marks */
+ len = EVBUFFER_LENGTH(bufev->input);
+ if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
+ return;
+ if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
+ struct evbuffer *buf = bufev->input;
+ event_del(&bufev->ev_read);
+
+ /* Now schedule a callback for us */
+ evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
+ return;
+ }
+
/* Invoke the user callback - must always be called last */
(*bufev->readcb)(bufev, bufev->cbarg);
return;
if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
- /* Invoke the user callback if our buffer is drained */
- if (EVBUFFER_LENGTH(bufev->output) == 0)
+ /*
+ * Invoke the user callback if our buffer is drained or below the
+ * low watermark.
+ */
+ if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
return;
bufev->timeout_read = timeout_read;
bufev->timeout_write = timeout_write;
}
+
+/*
+ * Sets the water marks
+ */
+
+void
+bufferevent_setwatermark(struct bufferevent *bufev, short events,
+ size_t lowmark, size_t highmark)
+{
+ if (events & EV_READ) {
+ bufev->wm_read.low = lowmark;
+ bufev->wm_read.high = highmark;
+ }
+
+ if (events & EV_WRITE) {
+ bufev->wm_write.low = lowmark;
+ bufev->wm_write.high = highmark;
+ }
+
+ /* If the watermarks changed then see if we should call read again */
+ bufferevent_read_pressure_cb(bufev->input,
+ 0, EVBUFFER_LENGTH(bufev->input), bufev);
+}
struct event_once *eonce;
struct timeval etv;
+ /* We cannot support signals that just fire once */
+ if (events & EV_SIGNAL)
+ return (-1);
+
if ((eonce = calloc(1, sizeof(struct event_once))) == NULL)
return (-1);
flags |= ev->ev_res;
if (ev->ev_flags & EVLIST_TIMEOUT)
flags |= EV_TIMEOUT;
+ if (ev->ev_flags & EVLIST_SIGNAL)
+ flags |= EV_SIGNAL;
- event &= (EV_TIMEOUT|EV_READ|EV_WRITE);
+ event &= (EV_TIMEOUT|EV_READ|EV_WRITE|EV_SIGNAL);
/* See if there is a timeout that we should report */
if (tv != NULL && (flags & event & EV_TIMEOUT))
size_t totallen;
size_t off;
+
+ void (*cb)(struct evbuffer *, size_t, size_t, void *);
+ void *cbarg;
};
/* Just for error reporting - use other constants otherwise */
typedef void (*evbuffercb)(struct bufferevent *, void *);
typedef void (*everrorcb)(struct bufferevent *, short what, void *);
+struct event_watermark {
+ size_t low;
+ size_t high;
+};
+
struct bufferevent {
struct event ev_read;
struct event ev_write;
struct evbuffer *input;
struct evbuffer *output;
+ struct event_watermark wm_read;
+ struct event_watermark wm_write;
+
evbuffercb readcb;
evbuffercb writecb;
everrorcb errorcb;
struct evbuffer *evbuffer_new(void);
void evbuffer_free(struct evbuffer *);
-void evbuffer_free(struct evbuffer *);
int evbuffer_add(struct evbuffer *, u_char *, size_t);
int evbuffer_add_buffer(struct evbuffer *, struct evbuffer *);
int evbuffer_add_printf(struct evbuffer *, char *fmt, ...);
int evbuffer_write(struct evbuffer *, int);
int evbuffer_read(struct evbuffer *, int, int);
u_char *evbuffer_find(struct evbuffer *, u_char *, size_t);
+void evbuffer_setcb(struct evbuffer *, void (*)(struct evbuffer *, size_t, size_t, void *), void *);
#ifdef __cplusplus
}