#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
+#include <sys/queue.h>
#include <errno.h>
#include <stdio.h>
#include "event2/buffer.h"
#include "event2/bufferevent_struct.h"
#include "event2/event.h"
+#include "log.h"
#include "mm-internal.h"
+#include "bufferevent-internal.h"
/* prototypes */
-void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
+static void bufferevent_read_pressure_cb(
+ struct evbuffer *, size_t, size_t, void *);
+static int bufferevent_process_filters(
+ struct bufferevent_filter *, struct evbuffer *,
+ enum bufferevent_filter_state);
static int
bufferevent_add(struct event *ev, int timeout)
* We use it to apply back pressure on the reading side.
*/
-void
+static void
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg) {
struct bufferevent *bufev = arg;
}
}
+static void
+bufferevent_read_closure(struct bufferevent *bufev, int progress)
+{
+ size_t len;
+
+ bufferevent_add(&bufev->ev_read, bufev->timeout_read);
+
+ /* nothing user visible changed? */
+ if (!progress)
+ return;
+
+ /* See if this callbacks meets the water marks */
+ len = EVBUFFER_LENGTH(bufev->input);
+ if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
+ return;
+
+ /* For read pressure, we use the buffer exposed to the users.
+ * Filters can arbitrarily change the data that users get to see,
+ * in particular, a user might select a watermark that is smaller
+ * then what a filter needs to make progress.
+ */
+ if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
+ event_del(&bufev->ev_read);
+
+ /* Now schedule a callback for us when the buffer changes */
+ evbuffer_setcb(bufev->input,
+ bufferevent_read_pressure_cb, bufev);
+ }
+
+ /* Invoke the user callback - must always be called last */
+ if (bufev->readcb != NULL)
+ (*bufev->readcb)(bufev, bufev->cbarg);
+}
+
static void
bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
{
struct bufferevent *bufev = arg;
- int res = 0;
+ struct evbuffer *input;
+ int res = 0, progress = 1;
short what = EVBUFFER_READ;
- size_t len;
int howmuch = -1;
if (event == EV_TIMEOUT) {
goto error;
}
+ if (TAILQ_FIRST(&bufev->input_filters) != NULL)
+ input = TAILQ_FIRST(&bufev->input_filters)->buffer;
+ else
+ input = bufev->input;
+
/*
* If we have a high watermark configured then we don't want to
* read more data than would make us reach the watermark.
*/
if (bufev->wm_read.high != 0) {
- howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
+ howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input);
/* we might have lowered the watermark, stop reading */
if (howmuch <= 0) {
- struct evbuffer *buf = bufev->input;
event_del(&bufev->ev_read);
- evbuffer_setcb(buf,
+ evbuffer_setcb(input,
bufferevent_read_pressure_cb, bufev);
return;
}
}
- res = evbuffer_read(bufev->input, fd, howmuch);
+ res = evbuffer_read(input, fd, howmuch);
+
if (res == -1) {
if (errno == EAGAIN || errno == EINTR)
goto reschedule;
what |= EVBUFFER_EOF;
}
+ if (TAILQ_FIRST(&bufev->input_filters) != NULL) {
+ int state = BEV_NORMAL;
+ if (what & EVBUFFER_EOF)
+ state = BEV_FLUSH;
+ /* XXX(niels): what to do about EVBUFFER_ERROR? */
+ progress = bufferevent_process_filters(
+ TAILQ_FIRST(&bufev->input_filters),
+ bufev->input,
+ state);
+
+ /* propagate potential errors to the user */
+ if (progress == -1) {
+ res = -1;
+ what |= EVBUFFER_ERROR;
+ }
+ }
+
if (res <= 0)
goto error;
- bufferevent_add(&bufev->ev_read, bufev->timeout_read);
-
- /* See if this callbacks meets the water marks */
- len = EVBUFFER_LENGTH(bufev->input);
- if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
- return;
- if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
- struct evbuffer *buf = bufev->input;
- event_del(&bufev->ev_read);
-
- /* Now schedule a callback for us when the buffer changes */
- evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
- }
-
- /* Invoke the user callback - must always be called last */
- if (bufev->readcb != NULL)
- (*bufev->readcb)(bufev, bufev->cbarg);
+ bufferevent_read_closure(bufev, progress);
return;
reschedule:
*/
bufev->enabled = EV_WRITE;
+ TAILQ_INIT(&bufev->input_filters);
+ TAILQ_INIT(&bufev->output_filters);
+
return (bufev);
}
void
bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
+ struct bufferevent_filter *filter;
+
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
event_base_set(bufev->ev_base, &bufev->ev_write);
}
+ /* we need to free all filter contexts and then init them again */
+ TAILQ_FOREACH(filter, &bufev->input_filters, next) {
+ if (filter->free_context)
+ filter->free_context(filter->ctx);
+ if (filter->init_context)
+ filter->init_context(filter->ctx);
+ }
+
+ TAILQ_FOREACH(filter, &bufev->output_filters, next) {
+ if (filter->free_context)
+ filter->free_context(filter->ctx);
+ if (filter->init_context)
+ filter->init_context(filter->ctx);
+ }
+
/* might have to manually trigger event registration */
}
struct evbuffer *
bufferevent_output(struct bufferevent *bufev)
{
- return (bufev->output);
+ return TAILQ_FIRST(&bufev->output_filters) != NULL ?
+ TAILQ_FIRST(&bufev->output_filters)->buffer :
+ bufev->output;
}
int
void
bufferevent_free(struct bufferevent *bufev)
{
+ struct bufferevent_filter *filter;
+
event_del(&bufev->ev_read);
event_del(&bufev->ev_write);
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
+ /* free input and output filters */
+ while ((filter = TAILQ_FIRST(&bufev->input_filters)) != NULL) {
+ bufferevent_filter_remove(bufev, BEV_INPUT, filter);
+
+ bufferevent_filter_free(filter);
+ }
+
+ while ((filter = TAILQ_FIRST(&bufev->output_filters)) != NULL) {
+ bufferevent_filter_remove(bufev, BEV_OUTPUT, filter);
+
+ bufferevent_filter_free(filter);
+ }
+
mm_free(bufev);
}
-
-static inline void
+/*
+ * Executes filters on the written data and schedules a network write if
+ * necessary.
+ */
+static inline int
bufferevent_write_closure(struct bufferevent *bufev, int progress)
{
+ /* if no data was written, we do not need to do anything */
+ if (!progress)
+ return (0);
+
+ if (TAILQ_FIRST(&bufev->output_filters) != NULL) {
+ progress = bufferevent_process_filters(
+ TAILQ_FIRST(&bufev->output_filters),
+ bufev->output, BEV_NORMAL);
+ if (progress == -1) {
+ (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
+ return (-1);
+ }
+ }
+
/* If everything is okay, we need to schedule a write */
- if (progress && (bufev->enabled & EV_WRITE))
+ if (bufev->enabled & EV_WRITE)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);
+
+ return (0);
}
/*
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
{
- if (evbuffer_add(bufev->output, data, size) == -1)
- return (-1);
+ struct evbuffer *output;
- bufferevent_write_closure(bufev, size > 0);
+ if (TAILQ_FIRST(&bufev->output_filters) != NULL)
+ output = TAILQ_FIRST(&bufev->output_filters)->buffer;
+ else
+ output = bufev->output;
- return (0);
+ if (evbuffer_add(output, data, size) == -1)
+ return (-1);
+
+ return (bufferevent_write_closure(bufev, size > 0));
}
int
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
{
- int len = EVBUFFER_LENGTH(bufev->output);
+ int len = EVBUFFER_LENGTH(buf);
+ struct evbuffer *output;
- if (evbuffer_add_buffer(bufev->output, buf) == -1)
- return (-1);
+ if (TAILQ_FIRST(&bufev->output_filters) != NULL)
+ output = TAILQ_FIRST(&bufev->output_filters)->buffer;
+ else
+ output = bufev->output;
- bufferevent_write_closure(bufev, len > 0);
+ if (evbuffer_add_buffer(output, buf) == -1)
+ return (-1);
- return (0);
+ return (bufferevent_write_closure(bufev, len > 0));
}
size_t
res = event_base_set(base, &bufev->ev_write);
return (res);
}
+
+/*
+ * Filtering stuff
+ */
+
+struct bufferevent_filter *
+bufferevent_filter_new(
+ void (*init_context)(void *ctx),
+ void (*free_context)(void *ctx),
+ enum bufferevent_filter_result (*process)(
+ struct evbuffer *src, struct evbuffer *dst,
+ enum bufferevent_filter_state flags, void *ctx), void *ctx)
+{
+ struct bufferevent_filter *filter;
+
+ if ((filter = mm_malloc(sizeof(struct bufferevent_filter))) == NULL)
+ return (NULL);
+
+ if ((filter->buffer = evbuffer_new()) == NULL) {
+ mm_free(filter);
+ return (NULL);
+ }
+
+ filter->init_context = init_context;
+ filter->free_context = free_context;
+ filter->process = process;
+ filter->ctx = ctx;
+
+ return (filter);
+}
+
+void
+bufferevent_filter_free(struct bufferevent_filter *filter)
+{
+ evbuffer_free(filter->buffer);
+ mm_free(filter);
+}
+
+void
+bufferevent_filter_insert(struct bufferevent *bufev,
+ enum bufferevent_filter_type filter_type,
+ struct bufferevent_filter *filter)
+{
+ switch (filter_type) {
+ case BEV_INPUT:
+ TAILQ_INSERT_TAIL(&bufev->input_filters, filter, next);
+ break;
+ case BEV_OUTPUT:
+ TAILQ_INSERT_HEAD(&bufev->output_filters, filter, next);
+ break;
+ default:
+ event_errx(1, "illegal filter type %d", filter_type);
+ }
+
+ if (filter->init_context)
+ filter->init_context(filter->ctx);
+}
+
+void
+bufferevent_filter_remove(struct bufferevent *bufev,
+ enum bufferevent_filter_type filter_type,
+ struct bufferevent_filter *filter)
+{
+ switch (filter_type) {
+ case BEV_INPUT:
+ TAILQ_REMOVE(&bufev->input_filters, filter, next);
+ break;
+ case BEV_OUTPUT:
+ TAILQ_REMOVE(&bufev->output_filters, filter, next);
+ break;
+ default:
+ event_errx(1, "illegal filter type %d", filter_type);
+ }
+
+ evbuffer_drain(filter->buffer, -1);
+
+ if (filter->free_context)
+ filter->free_context(filter->ctx);
+
+}
+
+static int
+bufferevent_process_filters(
+ struct bufferevent_filter *filter, struct evbuffer *final,
+ enum bufferevent_filter_state state)
+{
+ struct evbuffer *src, *dst;
+ struct bufferevent_filter *next;
+ int len = EVBUFFER_LENGTH(final);
+
+ for (; filter != NULL; filter = next) {
+ int res;
+
+ next = TAILQ_NEXT(filter, next);
+ src = filter->buffer;
+ dst = next != NULL ? next->buffer : final;
+
+ res = (*filter->process)(src, dst, state, filter->ctx);
+
+ /* an error causes complete termination of the bufferevent */
+ if (res == BEV_ERROR)
+ return (-1);
+
+ /* a read filter indicated that it cannot produce
+ * further data, we do not need to invoke any
+ * subsequent filters. Unless, a flush or something
+ * similar was specifically requested.
+ */
+ if (res == BEV_NEED_MORE && state == BEV_NORMAL)
+ return (0);
+ }
+
+ /* we made user visible progress if the buffer size changed */
+ return (EVBUFFER_LENGTH(final) != len);
+}
+
+int
+bufferevent_trigger_filter(struct bufferevent *bufev,
+ struct bufferevent_filter *filter, int iotype,
+ enum bufferevent_filter_state state)
+{
+ struct evbuffer *dst = iotype == BEV_INPUT ?
+ bufev->input : bufev->output;
+ int progress;
+
+ /* trigger all filters if filter is not specified */
+ if (filter == NULL) {
+ struct bufferevent_filterq *head = BEV_INPUT ?
+ &bufev->input_filters : &bufev->output_filters;
+ filter = TAILQ_FIRST(head);
+ }
+
+ progress = bufferevent_process_filters(filter, dst, state);
+ if (progress == -1) {
+ (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
+ return (-1);
+ }
+
+ switch (iotype) {
+ case BEV_INPUT:
+ bufferevent_read_closure(bufev, progress);
+ break;
+ case BEV_OUTPUT:
+ if (progress && (bufev->enabled & EV_WRITE))
+ bufferevent_add(
+ &bufev->ev_write, bufev->timeout_write);
+ break;
+ default:
+ event_errx(1, "Illegal bufferevent iotype: %d", iotype);
+ }
+
+ return (0);
+}
/**
Returns the outut buffer.
+ When filters are being used, the filters need to be manually
+ triggered if the output buffer was manipulated.
+
@param bufev the buffervent from which to get the evbuffer
@return the evbuffer object for the output buffer
*/
*/
int bufferevent_enable(struct bufferevent *bufev, short event);
-
/**
Disable a bufferevent.
#define EVBUFFER_INPUT(x) bufferevent_input(x)
#define EVBUFFER_OUTPUT(x) bufferevent_output(x)
+/**
+ Support for filtering input and output of bufferevents.
+ */
+
+/**
+ Flags that can be passed into filters to let them know how to
+ deal with the incoming data.
+*/
+enum bufferevent_filter_state {
+ /** usually set when processing data */
+ BEV_NORMAL = 0,
+
+ /** encountered EOF on read or done sending data */
+ BEV_FLUSH = 1,
+};
+
+/**
+ Values that filters can return.
+ */
+enum bufferevent_filter_result {
+ /** everything is okay */
+ BEV_OK = 0,
+
+ /** the filter needs to read more data before output */
+ BEV_NEED_MORE = 1,
+
+ /** the filter enountered a critical error, no further data
+ can be processed. */
+ BEV_ERROR = 2
+};
+
+struct bufferevent_filter;
+
+/**
+ Creates a new filtering object for a bufferevent.
+
+ Filters can be used to implement compression, authentication, rate limiting,
+ etc. for bufferevents. Filters can be associated with the input or output
+ path or both. Filters need to be inserted with bufferevent_filter_insert()
+ on either the input or output path.
+
+ For example, when implementing compression, both an input and an
+ output filters are required. The output filter compress all output
+ as it passes along whereas the input filter decompresses all input as
+ it is being read from the network.
+
+ Some filters may require specificaly behavior such as flushing their buffers
+ on EOF. To allom them to do that, a bufferevent will invoke the filter
+ with BEV_FLUSH to let it know that EOF has been reached.
+
+ When a filter needs more data before it can output any data, it may return
+ BEV_NEED_MORE in which case the filter chain is being interrupted until
+ more data arrives. A filter can indicate a fatal error by returning
+ BEV_ERROR. Otherwise, it should return BEV_OK.
+
+ @param init_context an optional function that initializes the ctx parameter.
+ @param free_context an optional function to free memory associated with the
+ ctx parameter.
+ @param process the filtering function that should be invokved either during
+ input or output depending on where the filter should be attached.
+ @param ctx additional context that can be passed to the process function
+ @return a bufferevent_filter object that can subsequently be installed
+*/
+struct bufferevent_filter *bufferevent_filter_new(
+ void (*init_context)(void *),
+ void (*free_context)(void *),
+ enum bufferevent_filter_result (*process)(
+ struct evbuffer *src, struct evbuffer *dst,
+ enum bufferevent_filter_state state, void *ctx), void *ctx);
+
+/**
+ Frees the filter object.
+
+ It must have been removed from the bufferevent before it can be freed.
+
+ @param filter the filter to be freed
+ @see bufferevent_filter_remove()
+*/
+void bufferevent_filter_free(struct bufferevent_filter *filter);
+
+/** Filter types for inserting or removing filters */
+enum bufferevent_filter_type {
+ /** filter is being used for input */
+ BEV_INPUT = 0,
+
+ /** filter is being used for output */
+ BEV_OUTPUT = 1
+};
+
+/**
+ Inserts a filter into the processing of data for bufferevent.
+
+ A filter can be inserted only once. It can not be used again for
+ another insert unless it have been removed via
+ bufferevent_filter_remove() first.
+
+ Input filters are inserted at the end, output filters at the
+ beginning of the queue.
+
+ @param bufev the bufferevent object into which to install the filter
+ @param filter_type either BEV_INPUT or BEV_OUTPUT
+ @param filter the filter object
+ @see bufferevent_filter_remove()
+ */
+void bufferevent_filter_insert(struct bufferevent *bufev,
+ enum bufferevent_filter_type filter_type,
+ struct bufferevent_filter *filter);
+
+/**
+ Removes a filter from the bufferevent.
+
+ A filter should be flushed via buffervent_trigger_filter before removing
+ it from a bufferevent. Any remaining intermediate buffer data is going
+ to be lost.
+
+ @param bufev the bufferevent object from which to remove the filter
+ @param filter_type either BEV_INPUT or BEV_OUTPUT
+ @param filter the filter object or NULL to trigger all filters
+ @see bufferevent_trigger_filter()
+*/
+void bufferevent_filter_remove(struct bufferevent *bufev,
+ enum bufferevent_filter_type filter_type,
+ struct bufferevent_filter *filter);
+
+/**
+ Triggers the filter chain the specified filter to produce more
+ data is possible. This is primarily for time-based filters such
+ as rate-limiting to produce more data as time passes.
+
+ @param bufev the bufferevent object to which the filter belongs
+ @param filter the bufferevent filter at which to start
+ @param iotype either BEV_INPUT or BEV_OUTPUT depending on where the filter
+ was installed
+ @param state either BEV_NORMAL or BEV_FLUSH
+ @return -1 on failure, 0 if no data was produces, 1 if data was produced
+ */
+
+int
+bufferevent_trigger_filter(struct bufferevent *bufev,
+ struct bufferevent_filter *filter, int iotype,
+ enum bufferevent_filter_state state);
#ifdef __cplusplus
}
cleanup_test();
}
+/*
+ * Test bufferevent filters
+ */
+
+/* strip an 'x' from each byte */
+
+static enum bufferevent_filter_result
+bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
+ enum bufferevent_filter_state state, void *ctx)
+{
+ const unsigned char *buffer;
+ int i;
+
+ if (state == BEV_FREE_DATA)
+ return (BEV_OK);
+
+ buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src));
+ for (i = 0; i < EVBUFFER_LENGTH(src); i += 2) {
+ assert(buffer[i] == 'x');
+ evbuffer_add(dst, buffer + i + 1, 1);
+
+ if (i + 2 > EVBUFFER_LENGTH(src))
+ break;
+ }
+
+ evbuffer_drain(src, i);
+ return (BEV_OK);
+}
+
+/* add an 'x' before each byte */
+
+static enum bufferevent_filter_result
+bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
+ enum bufferevent_filter_state state, void *ctx)
+{
+ const unsigned char *buffer;
+ int i;
+
+ if (state == BEV_FREE_DATA)
+ return (BEV_OK);
+
+ buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src));
+ for (i = 0; i < EVBUFFER_LENGTH(src); ++i) {
+ evbuffer_add(dst, "x", 1);
+ evbuffer_add(dst, buffer + i, 1);
+ }
+
+ evbuffer_drain(src, EVBUFFER_LENGTH(src));
+ return (BEV_OK);
+}
+
+static void
+test_bufferevent_filters(void)
+{
+ struct bufferevent *bev1, *bev2;
+ struct bufferevent_filter *finput, *foutput;
+ char buffer[8333];
+ int i;
+
+ setup_test("Bufferevent Filters: ");
+
+ bev1 = bufferevent_new(pair[0], NULL, writecb, errorcb, NULL);
+ bev2 = bufferevent_new(pair[1], readcb, NULL, errorcb, NULL);
+
+ bufferevent_disable(bev1, EV_READ);
+ bufferevent_enable(bev2, EV_READ);
+
+ for (i = 0; i < sizeof(buffer); i++)
+ buffer[i] = i;
+
+ /* insert some filters */
+ finput = bufferevent_filter_new(
+ NULL, NULL,bufferevent_input_filter, NULL);
+ foutput = bufferevent_filter_new(
+ NULL, NULL, bufferevent_output_filter, NULL);
+
+ bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput);
+ bufferevent_filter_insert(bev2, BEV_INPUT, finput);
+
+ bufferevent_write(bev1, buffer, sizeof(buffer));
+
+ event_dispatch();
+
+ bufferevent_filter_remove(bev1, BEV_OUTPUT, foutput);
+ bufferevent_filter_free(foutput);
+
+ bufferevent_free(bev1);
+ bufferevent_free(bev2);
+
+ if (test_ok != 2)
+ test_ok = 0;
+
+ cleanup_test();
+}
+
struct test_pri_event {
struct event ev;
int count;
test_bufferevent();
test_bufferevent_watermarks();
+ test_bufferevent_filters();
test_free_active_base();