]> granicus.if.org Git - libevent/commitdiff
support input/output filters for bufferevents
authorNiels Provos <provos@gmail.com>
Wed, 30 Apr 2008 00:09:16 +0000 (00:09 +0000)
committerNiels Provos <provos@gmail.com>
Wed, 30 Apr 2008 00:09:16 +0000 (00:09 +0000)
svn:r748

ChangeLog
Makefile.am
bufferevent-internal.h [new file with mode: 0644]
evbuffer.c
include/event2/bufferevent.h
include/event2/bufferevent_struct.h
test/regress.c

index 43925821daeccb5ca18a934eb7b19c25d4a976c5..3a5fa85baebf965503b04e20a778b0c35ec0c0ef 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -79,6 +79,7 @@ Changes in current version:
  o convert evhttp_connection to use bufferevents.
  o use libevent's internal timercmp on all platforms, to avoid bugs on old platforms where timercmp(a,b,<=) is buggy.
  o Remove the never-exported, never-used evhttp_hostportfile function.
+ o Support input/output filters for bufferevents; somewhat similar to libio's model.  This will allow us to implement SSL, compression, etc, transparently to users of bufferevents such as the http layer.
        
 Changes in 1.4.0:
  o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr.
index 5fab46a07f6f88d5ec613a2c58230d2a04243343..22d9679019e90d3dca7e0d14a65cc39b044c5bf0 100644 (file)
@@ -81,7 +81,9 @@ event-config.h: config.h
            -e 's/#ifndef /#ifndef _EVENT_/' < config.h >> $@
        echo "#endif" >> $@
 
-CORE_SRC = event.c buffer.c evbuffer.c log.c evutil.c $(SYS_SRC)
+CORE_SRC = event.c buffer.c evbuffer-internal.h evbuffer.c \
+       bufferevent-internal.h \
+       log.c evutil.c $(SYS_SRC)
 EXTRA_SRC = event_tagging.c http.c evhttp.h http-internal.h evdns.c \
        evdns.h evrpc.c evrpc.h evrpc-internal.h mm-internal.h \
        strlcpy.c strlcpy-internal.h strlcpy-internal.h
diff --git a/bufferevent-internal.h b/bufferevent-internal.h
new file mode 100644 (file)
index 0000000..b8803ea
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2008 Niels Provos <provos@citi.umich.edu>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef _BUFFEREVENT_INTERNAL_H_
+#define _BUFFEREVENT_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "config.h"
+#include "evutil.h"
+
+struct bufferevent_filter {
+       /** allows chaining of filters */
+       TAILQ_ENTRY(bufferevent_filter) (next);
+
+       /** used for intermediary state either on the input or output path */
+       struct evbuffer *buffer;
+
+       /** initializes the context provided to process */
+       void (*init_context)(void *);
+
+       /** frees any context related to ctx */
+       void (*free_context)(void *);
+
+       enum bufferevent_filter_result (*process)(
+               struct evbuffer *src, struct evbuffer *dst,
+               enum bufferevent_filter_state flags, void *ctx);
+
+       void *ctx;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _BUFFEREVENT_INTERNAL_H_ */
index 9442da6792015c0e31b7ba456b17ba2422bfbd4f..faf6cdb1682c87ea5c20015ab445815f9b0e01ca 100644 (file)
@@ -34,6 +34,7 @@
 #ifdef HAVE_SYS_TIME_H
 #include <sys/time.h>
 #endif
+#include <sys/queue.h>
 
 #include <errno.h>
 #include <stdio.h>
 #include "event2/buffer.h"
 #include "event2/bufferevent_struct.h"
 #include "event2/event.h"
+#include "log.h"
 #include "mm-internal.h"
+#include "bufferevent-internal.h"
 
 /* prototypes */
 
-void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
+static void bufferevent_read_pressure_cb(
+       struct evbuffer *, size_t, size_t, void *);
+static int bufferevent_process_filters(
+       struct bufferevent_filter *, struct evbuffer *,
+       enum bufferevent_filter_state);
 
 static int
 bufferevent_add(struct event *ev, int timeout)
@@ -77,7 +84,7 @@ bufferevent_add(struct event *ev, int timeout)
  * We use it to apply back pressure on the reading side.
  */
 
-void
+static void
 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
     void *arg) {
        struct bufferevent *bufev = arg;
@@ -93,13 +100,47 @@ bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
        }
 }
 
+static void
+bufferevent_read_closure(struct bufferevent *bufev, int progress)
+{
+       size_t len;
+
+       bufferevent_add(&bufev->ev_read, bufev->timeout_read);
+
+       /* nothing user visible changed? */
+       if (!progress)
+               return;
+
+       /* See if this callbacks meets the water marks */
+       len = EVBUFFER_LENGTH(bufev->input);
+       if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
+               return;
+
+       /* For read pressure, we use the buffer exposed to the users.
+        * Filters can arbitrarily change the data that users get to see,
+        * in particular, a user might select a watermark that is smaller
+        * then what a filter needs to make progress.
+        */
+       if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
+               event_del(&bufev->ev_read);
+
+               /* Now schedule a callback for us when the buffer changes */
+               evbuffer_setcb(bufev->input,
+                   bufferevent_read_pressure_cb, bufev);
+       }
+
+       /* Invoke the user callback - must always be called last */
+       if (bufev->readcb != NULL)
+               (*bufev->readcb)(bufev, bufev->cbarg);
+}
+
 static void
 bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
 {
        struct bufferevent *bufev = arg;
-       int res = 0;
+       struct evbuffer *input;
+       int res = 0, progress = 1;
        short what = EVBUFFER_READ;
-       size_t len;
        int howmuch = -1;
 
        if (event == EV_TIMEOUT) {
@@ -107,23 +148,28 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
                goto error;
        }
 
+       if (TAILQ_FIRST(&bufev->input_filters) != NULL)
+               input = TAILQ_FIRST(&bufev->input_filters)->buffer;
+       else
+               input = bufev->input;
+
        /*
         * If we have a high watermark configured then we don't want to
         * read more data than would make us reach the watermark.
         */
        if (bufev->wm_read.high != 0) {
-               howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
+               howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(input);
                /* we might have lowered the watermark, stop reading */
                if (howmuch <= 0) {
-                       struct evbuffer *buf = bufev->input;
                        event_del(&bufev->ev_read);
-                       evbuffer_setcb(buf,
+                       evbuffer_setcb(input,
                            bufferevent_read_pressure_cb, bufev);
                        return;
                }
        }
 
-       res = evbuffer_read(bufev->input, fd, howmuch);
+       res = evbuffer_read(input, fd, howmuch);
+
        if (res == -1) {
                if (errno == EAGAIN || errno == EINTR)
                        goto reschedule;
@@ -134,26 +180,27 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
                what |= EVBUFFER_EOF;
        }
 
+       if (TAILQ_FIRST(&bufev->input_filters) != NULL) {
+               int state = BEV_NORMAL;
+               if (what & EVBUFFER_EOF)
+                       state = BEV_FLUSH;
+               /* XXX(niels): what to do about EVBUFFER_ERROR? */
+               progress = bufferevent_process_filters(
+                       TAILQ_FIRST(&bufev->input_filters),
+                       bufev->input,
+                       state);
+
+               /* propagate potential errors to the user */
+               if (progress == -1) {
+                       res = -1;
+                       what |= EVBUFFER_ERROR;
+               }
+       }
+       
        if (res <= 0)
                goto error;
 
-       bufferevent_add(&bufev->ev_read, bufev->timeout_read);
-
-       /* See if this callbacks meets the water marks */
-       len = EVBUFFER_LENGTH(bufev->input);
-       if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
-               return;
-       if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
-               struct evbuffer *buf = bufev->input;
-               event_del(&bufev->ev_read);
-
-               /* Now schedule a callback for us when the buffer changes */
-               evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
-       }
-
-       /* Invoke the user callback - must always be called last */
-       if (bufev->readcb != NULL)
-               (*bufev->readcb)(bufev, bufev->cbarg);
+       bufferevent_read_closure(bufev, progress);
        return;
 
  reschedule:
@@ -266,6 +313,9 @@ bufferevent_new(evutil_socket_t fd, evbuffercb readcb, evbuffercb writecb,
         */
        bufev->enabled = EV_WRITE;
 
+       TAILQ_INIT(&bufev->input_filters);
+       TAILQ_INIT(&bufev->output_filters);
+
        return (bufev);
 }
 
@@ -283,6 +333,8 @@ bufferevent_setcb(struct bufferevent *bufev,
 void
 bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
 {
+       struct bufferevent_filter *filter;
+
        event_del(&bufev->ev_read);
        event_del(&bufev->ev_write);
 
@@ -293,6 +345,21 @@ bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd)
                event_base_set(bufev->ev_base, &bufev->ev_write);
        }
 
+       /* we need to free all filter contexts and then init them again */
+       TAILQ_FOREACH(filter, &bufev->input_filters, next) {
+               if (filter->free_context)
+                       filter->free_context(filter->ctx);
+               if (filter->init_context)
+                       filter->init_context(filter->ctx);
+       }
+
+       TAILQ_FOREACH(filter, &bufev->output_filters, next) {
+               if (filter->free_context)
+                       filter->free_context(filter->ctx);
+               if (filter->init_context)
+                       filter->init_context(filter->ctx);
+       }
+
        /* might have to manually trigger event registration */
 }
 
@@ -305,7 +372,9 @@ bufferevent_input(struct bufferevent *bufev)
 struct evbuffer *
 bufferevent_output(struct bufferevent *bufev)
 {
-       return (bufev->output);
+       return TAILQ_FIRST(&bufev->output_filters) != NULL ?
+           TAILQ_FIRST(&bufev->output_filters)->buffer :
+           bufev->output;
 }
 
 int
@@ -324,21 +393,55 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority)
 void
 bufferevent_free(struct bufferevent *bufev)
 {
+       struct bufferevent_filter *filter;
+
        event_del(&bufev->ev_read);
        event_del(&bufev->ev_write);
 
        evbuffer_free(bufev->input);
        evbuffer_free(bufev->output);
 
+       /* free input and output filters */
+       while ((filter = TAILQ_FIRST(&bufev->input_filters)) != NULL) {
+               bufferevent_filter_remove(bufev, BEV_INPUT, filter);
+
+               bufferevent_filter_free(filter);
+       }
+       
+       while ((filter = TAILQ_FIRST(&bufev->output_filters)) != NULL) {
+               bufferevent_filter_remove(bufev, BEV_OUTPUT, filter);
+
+               bufferevent_filter_free(filter);
+       }
+
        mm_free(bufev);
 }
-
-static inline void
+/*
+ * Executes filters on the written data and schedules a network write if
+ * necessary.
+ */
+static inline int
 bufferevent_write_closure(struct bufferevent *bufev, int progress)
 {
+       /* if no data was written, we do not need to do anything */
+       if (!progress)
+               return (0);
+
+       if (TAILQ_FIRST(&bufev->output_filters) != NULL) {
+               progress = bufferevent_process_filters(
+                       TAILQ_FIRST(&bufev->output_filters),
+                       bufev->output, BEV_NORMAL);
+               if (progress == -1) {
+                       (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
+                       return (-1);
+               }
+       }
+
        /* If everything is okay, we need to schedule a write */
-       if (progress && (bufev->enabled & EV_WRITE))
+       if (bufev->enabled & EV_WRITE)
                bufferevent_add(&bufev->ev_write, bufev->timeout_write);
+
+       return (0);
 }
 
 /*
@@ -349,25 +452,34 @@ bufferevent_write_closure(struct bufferevent *bufev, int progress)
 int
 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
 {
-       if (evbuffer_add(bufev->output, data, size) == -1)
-               return (-1);
+       struct evbuffer *output;
 
-       bufferevent_write_closure(bufev, size > 0);
+       if (TAILQ_FIRST(&bufev->output_filters) != NULL)
+               output = TAILQ_FIRST(&bufev->output_filters)->buffer;
+       else
+               output = bufev->output;
 
-       return (0);
+       if (evbuffer_add(output, data, size) == -1)
+               return (-1);
+
+       return (bufferevent_write_closure(bufev, size > 0));
 }
 
 int
 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
 {
-       int len = EVBUFFER_LENGTH(bufev->output);
+       int len = EVBUFFER_LENGTH(buf);
+       struct evbuffer *output;
 
-       if (evbuffer_add_buffer(bufev->output, buf) == -1)
-               return (-1);
+       if (TAILQ_FIRST(&bufev->output_filters) != NULL)
+               output = TAILQ_FIRST(&bufev->output_filters)->buffer;
+       else
+               output = bufev->output;
 
-       bufferevent_write_closure(bufev, len > 0);
+       if (evbuffer_add_buffer(output, buf) == -1)
+               return (-1);
        
-       return (0);
+       return (bufferevent_write_closure(bufev, len > 0));
 }
 
 size_t
@@ -462,3 +574,156 @@ bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
        res = event_base_set(base, &bufev->ev_write);
        return (res);
 }
+
+/*
+ * Filtering stuff
+ */
+
+struct bufferevent_filter *
+bufferevent_filter_new(
+       void (*init_context)(void *ctx),
+       void (*free_context)(void *ctx),
+       enum bufferevent_filter_result (*process)(
+               struct evbuffer *src, struct evbuffer *dst,
+               enum bufferevent_filter_state flags, void *ctx), void *ctx)
+{
+       struct bufferevent_filter *filter;
+
+       if ((filter = mm_malloc(sizeof(struct bufferevent_filter))) == NULL)
+               return (NULL);
+
+       if ((filter->buffer = evbuffer_new()) == NULL) {
+               mm_free(filter);
+               return (NULL);
+       }
+
+       filter->init_context = init_context;
+       filter->free_context = free_context;
+       filter->process = process;
+       filter->ctx = ctx;
+
+       return (filter);
+}
+
+void
+bufferevent_filter_free(struct bufferevent_filter *filter)
+{
+       evbuffer_free(filter->buffer);
+       mm_free(filter);
+}
+
+void
+bufferevent_filter_insert(struct bufferevent *bufev,
+    enum bufferevent_filter_type filter_type,
+    struct bufferevent_filter *filter)
+{
+       switch (filter_type) {
+       case BEV_INPUT:
+               TAILQ_INSERT_TAIL(&bufev->input_filters, filter, next);
+               break;
+       case BEV_OUTPUT:
+               TAILQ_INSERT_HEAD(&bufev->output_filters, filter, next);
+               break;
+       default:
+               event_errx(1, "illegal filter type %d", filter_type);
+       }
+
+       if (filter->init_context)
+               filter->init_context(filter->ctx);
+}
+
+void
+bufferevent_filter_remove(struct bufferevent *bufev,
+    enum bufferevent_filter_type filter_type,
+    struct bufferevent_filter *filter)
+{
+       switch (filter_type) {
+       case BEV_INPUT:
+               TAILQ_REMOVE(&bufev->input_filters, filter, next);
+               break;
+       case BEV_OUTPUT:
+               TAILQ_REMOVE(&bufev->output_filters, filter, next);
+               break;
+       default:
+               event_errx(1, "illegal filter type %d", filter_type);
+       }
+
+       evbuffer_drain(filter->buffer, -1);
+
+       if (filter->free_context)
+               filter->free_context(filter->ctx);
+               
+}
+
+static int
+bufferevent_process_filters(
+       struct bufferevent_filter *filter, struct evbuffer *final,
+       enum bufferevent_filter_state state)
+{
+       struct evbuffer *src, *dst;
+       struct bufferevent_filter *next;
+       int len = EVBUFFER_LENGTH(final);
+
+       for (; filter != NULL; filter = next) {
+               int res;
+
+               next = TAILQ_NEXT(filter, next);
+               src = filter->buffer;
+               dst = next != NULL ? next->buffer : final;
+
+               res = (*filter->process)(src, dst, state, filter->ctx);
+
+               /* an error causes complete termination of the bufferevent */
+               if (res == BEV_ERROR)
+                       return (-1);
+
+               /* a read filter indicated that it cannot produce
+                * further data, we do not need to invoke any
+                * subsequent filters. Unless, a flush or something
+                * similar was specifically requested.
+                */
+               if (res == BEV_NEED_MORE && state == BEV_NORMAL)
+                       return (0);
+       }
+
+       /* we made user visible progress if the buffer size changed */
+       return (EVBUFFER_LENGTH(final) != len);
+}
+
+int
+bufferevent_trigger_filter(struct bufferevent *bufev,
+    struct bufferevent_filter *filter, int iotype,
+    enum bufferevent_filter_state state)
+{
+       struct evbuffer *dst = iotype == BEV_INPUT ?
+           bufev->input : bufev->output;
+       int progress;
+
+       /* trigger all filters if filter is not specified */
+       if (filter == NULL) {
+               struct bufferevent_filterq *head = BEV_INPUT ?
+                   &bufev->input_filters : &bufev->output_filters;
+               filter = TAILQ_FIRST(head);
+       }
+
+       progress = bufferevent_process_filters(filter, dst, state);
+       if (progress == -1) {
+               (*bufev->errorcb)(bufev, EVBUFFER_ERROR, bufev->cbarg);
+               return (-1);
+       }
+
+       switch (iotype) {
+       case BEV_INPUT:
+               bufferevent_read_closure(bufev, progress);
+               break;
+       case BEV_OUTPUT:
+               if (progress && (bufev->enabled & EV_WRITE))
+                       bufferevent_add(
+                               &bufev->ev_write, bufev->timeout_write);
+               break;
+       default:
+               event_errx(1, "Illegal bufferevent iotype: %d", iotype);
+       }
+
+       return (0);
+}
index 5ef4e8bc48bb95a594b187b6330c082df0aef1f4..3a630805950deba783424c667ad3b4b1692a560a 100644 (file)
@@ -228,6 +228,9 @@ struct evbuffer *bufferevent_input(struct bufferevent *bufev);
 /**
    Returns the outut buffer.
 
+   When filters are being used, the filters need to be manually
+   triggered if the output buffer was manipulated.
+
    @param bufev the buffervent from which to get the evbuffer
    @return the evbuffer object for the output buffer
  */
@@ -244,7 +247,6 @@ struct evbuffer *bufferevent_output(struct bufferevent *bufev);
  */
 int bufferevent_enable(struct bufferevent *bufev, short event);
 
-
 /**
   Disable a bufferevent.
 
@@ -288,6 +290,147 @@ void bufferevent_setwatermark(struct bufferevent *bufev, short events,
 #define EVBUFFER_INPUT(x)      bufferevent_input(x)
 #define EVBUFFER_OUTPUT(x)     bufferevent_output(x)
 
+/**
+   Support for filtering input and output of bufferevents.
+ */
+
+/**
+   Flags that can be passed into filters to let them know how to
+   deal with the incoming data.
+*/
+enum bufferevent_filter_state {
+       /** usually set when processing data */
+       BEV_NORMAL = 0,
+
+       /** encountered EOF on read or done sending data */
+       BEV_FLUSH = 1,
+};
+
+/**
+   Values that filters can return.
+ */
+enum bufferevent_filter_result {
+       /** everything is okay */
+       BEV_OK = 0,
+
+       /** the filter needs to read more data before output */
+       BEV_NEED_MORE = 1,
+
+       /** the filter enountered a critical error, no further data
+           can be processed. */
+       BEV_ERROR = 2
+};
+
+struct bufferevent_filter;
+
+/**
+  Creates a new filtering object for a bufferevent.
+
+  Filters can be used to implement compression, authentication, rate limiting,
+  etc. for bufferevents.  Filters can be associated with the input or output
+  path or both.   Filters need to be inserted with bufferevent_filter_insert()
+  on either the input or output path.
+
+  For example, when implementing compression, both an input and an
+  output filters are required.   The output filter compress all output
+  as it passes along whereas the input filter decompresses all input as
+  it is being read from the network.
+
+  Some filters may require specificaly behavior such as flushing their buffers
+  on EOF.   To allom them to do that, a bufferevent will invoke the filter
+  with BEV_FLUSH to let it know that EOF has been reached.
+
+  When a filter needs more data before it can output any data, it may return
+  BEV_NEED_MORE in which case the filter chain is being interrupted until
+  more data arrives.   A filter can indicate a fatal error by returning
+  BEV_ERROR.  Otherwise, it should return BEV_OK.
+
+  @param init_context an optional function that initializes the ctx parameter.
+  @param free_context an optional function to free memory associated with the
+         ctx parameter.
+  @param process the filtering function that should be invokved either during
+         input or output depending on where the filter should be attached.
+  @param ctx additional context that can be passed to the process function
+  @return a bufferevent_filter object that can subsequently be installed
+*/
+struct bufferevent_filter *bufferevent_filter_new(
+       void (*init_context)(void *),
+       void (*free_context)(void *),
+       enum bufferevent_filter_result (*process)(
+               struct evbuffer *src, struct evbuffer *dst,
+               enum bufferevent_filter_state state, void *ctx), void *ctx);
+
+/**
+   Frees the filter object.
+
+   It must have been removed from the bufferevent before it can be freed.
+
+   @param filter the filter to be freed
+   @see bufferevent_filter_remove()
+*/
+void bufferevent_filter_free(struct bufferevent_filter *filter);
+
+/** Filter types for inserting or removing filters */
+enum bufferevent_filter_type {
+       /** filter is being used for input */
+       BEV_INPUT = 0,
+
+       /** filter is being used for output */
+       BEV_OUTPUT = 1
+};
+
+/**
+   Inserts a filter into the processing of data for bufferevent.
+
+   A filter can be inserted only once.  It can not be used again for
+   another insert unless it have been removed via
+   bufferevent_filter_remove() first.
+
+   Input filters are inserted at the end, output filters at the
+   beginning of the queue.
+
+   @param bufev the bufferevent object into which to install the filter
+   @param filter_type either BEV_INPUT or BEV_OUTPUT
+   @param filter the filter object
+   @see bufferevent_filter_remove()
+ */
+void bufferevent_filter_insert(struct bufferevent *bufev,
+    enum bufferevent_filter_type filter_type,
+    struct bufferevent_filter *filter);
+
+/**
+   Removes a filter from the bufferevent.
+
+   A filter should be flushed via buffervent_trigger_filter before removing
+   it from a bufferevent.  Any remaining intermediate buffer data is going
+   to be lost.
+
+   @param bufev the bufferevent object from which to remove the filter
+   @param filter_type either BEV_INPUT or BEV_OUTPUT
+   @param filter the filter object or NULL to trigger all filters
+   @see bufferevent_trigger_filter()
+*/
+void bufferevent_filter_remove(struct bufferevent *bufev,
+    enum bufferevent_filter_type filter_type,
+    struct bufferevent_filter *filter);
+
+/**
+  Triggers the filter chain the specified filter to produce more
+  data is possible.  This is primarily for time-based filters such
+  as rate-limiting to produce more data as time passes.
+
+  @param bufev the bufferevent object to which the filter belongs
+  @param filter the bufferevent filter at which to start
+  @param iotype either BEV_INPUT or BEV_OUTPUT depending on where the filter
+        was installed
+  @param state either BEV_NORMAL or BEV_FLUSH
+  @return -1 on failure, 0 if no data was produces, 1 if data was produced
+ */
+
+int
+bufferevent_trigger_filter(struct bufferevent *bufev,
+    struct bufferevent_filter *filter, int iotype,
+    enum bufferevent_filter_state state);
 
 #ifdef __cplusplus
 }
index f35ba2a2dfeafb958e59a37dcd438bd534d52c64..8616a9a66362801e98ac08e80544ff03cf1e0008 100644 (file)
@@ -64,6 +64,25 @@ struct event_watermark {
        size_t high;
 };
 
+struct bufferevent_filter;
+
+/* Fix so that ppl dont have to run with <sys/queue.h> */
+#ifndef TAILQ_HEAD
+#define _EVENT_DEFINED_TQHEAD
+#define TAILQ_HEAD(name, type)                                         \
+struct name {                                                          \
+       struct type *tqh_first; /* first element */                     \
+       struct type **tqh_last; /* addr of last next element */         \
+}
+#endif /* !TAILQ_HEAD */
+
+TAILQ_HEAD(bufferevent_filterq, bufferevent_filter);
+
+#ifdef _EVENT_DEFINED_TQHEAD
+#undef TAILQ_HEAD
+#undef _EVENT_DEFINED_TQHEAD
+#endif /* _EVENT_DEFINED_TQHEAD */
+
 struct bufferevent {
        struct event_base *ev_base;
 
@@ -85,8 +104,11 @@ struct bufferevent {
        int timeout_write;      /* in seconds */
 
        short enabled;  /* events that are currently enabled */
-};
 
+       /** the list of input and output filters */
+       struct bufferevent_filterq input_filters;
+       struct bufferevent_filterq output_filters;
+};
 
 #ifdef __cplusplus
 }
index 55d686095f5ab4aa0ab7bcc9e75b115e08432e3d..e73d5dd2f4da20252f9982d011ad79660977ba68 100644 (file)
@@ -1443,6 +1443,101 @@ test_bufferevent_watermarks(void)
        cleanup_test();
 }
 
+/*
+ * Test bufferevent filters
+ */
+
+/* strip an 'x' from each byte */
+
+static enum bufferevent_filter_result
+bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
+    enum bufferevent_filter_state state, void *ctx)
+{
+       const unsigned char *buffer;
+       int i;
+
+       if (state == BEV_FREE_DATA)
+               return (BEV_OK);
+
+       buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src));
+       for (i = 0; i < EVBUFFER_LENGTH(src); i += 2) {
+               assert(buffer[i] == 'x');
+               evbuffer_add(dst, buffer + i + 1, 1);
+
+               if (i + 2 > EVBUFFER_LENGTH(src))
+                       break;
+       }
+
+       evbuffer_drain(src, i);
+       return (BEV_OK);
+}
+
+/* add an 'x' before each byte */
+
+static enum bufferevent_filter_result
+bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
+    enum bufferevent_filter_state state, void *ctx)
+{
+       const unsigned char *buffer;
+       int i;
+
+       if (state == BEV_FREE_DATA)
+               return (BEV_OK);
+
+       buffer = evbuffer_pullup(src, EVBUFFER_LENGTH(src));
+       for (i = 0; i < EVBUFFER_LENGTH(src); ++i) {
+               evbuffer_add(dst, "x", 1);
+               evbuffer_add(dst, buffer + i, 1);
+       }
+
+       evbuffer_drain(src, EVBUFFER_LENGTH(src));
+       return (BEV_OK);
+}
+
+static void
+test_bufferevent_filters(void)
+{
+       struct bufferevent *bev1, *bev2;
+       struct bufferevent_filter *finput, *foutput;
+       char buffer[8333];
+       int i;
+
+       setup_test("Bufferevent Filters: ");
+
+       bev1 = bufferevent_new(pair[0], NULL, writecb, errorcb, NULL);
+       bev2 = bufferevent_new(pair[1], readcb, NULL, errorcb, NULL);
+
+       bufferevent_disable(bev1, EV_READ);
+       bufferevent_enable(bev2, EV_READ);
+
+       for (i = 0; i < sizeof(buffer); i++)
+               buffer[i] = i;
+
+       /* insert some filters */
+       finput = bufferevent_filter_new(
+               NULL, NULL,bufferevent_input_filter, NULL);
+       foutput = bufferevent_filter_new(
+               NULL, NULL, bufferevent_output_filter, NULL);
+
+       bufferevent_filter_insert(bev1, BEV_OUTPUT, foutput);
+       bufferevent_filter_insert(bev2, BEV_INPUT, finput);
+
+       bufferevent_write(bev1, buffer, sizeof(buffer));
+
+       event_dispatch();
+
+       bufferevent_filter_remove(bev1, BEV_OUTPUT, foutput);
+       bufferevent_filter_free(foutput);
+
+       bufferevent_free(bev1);
+       bufferevent_free(bev2);
+
+       if (test_ok != 2)
+               test_ok = 0;
+
+       cleanup_test();
+}
+
 struct test_pri_event {
        struct event ev;
        int count;
@@ -1944,6 +2039,7 @@ main (int argc, char **argv)
        
        test_bufferevent();
        test_bufferevent_watermarks();
+       test_bufferevent_filters();
 
        test_free_active_base();