]> granicus.if.org Git - libevent/commitdiff
provide hooks for outgoing pools; associate a base with a pool
authorNiels Provos <provos@gmail.com>
Sat, 3 Nov 2007 22:51:26 +0000 (22:51 +0000)
committerNiels Provos <provos@gmail.com>
Sat, 3 Nov 2007 22:51:26 +0000 (22:51 +0000)
svn:r468

ChangeLog
evrpc-internal.h
evrpc.c
evrpc.h
test/regress_rpc.c

index ff5a535e78282156624847e5a90b9b41a9b52ab1..4a819f5f98db1057311bf14c8c43c1bd5ec8db93 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -31,3 +31,4 @@ Changes in current version:
  o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa
  o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication.
  o use a min heap instead of a red-black tree for timeouts; as a result finding the min is a O(1) operation now; from Maxim Yegorushkin
+ o associate an event base with an rpc pool
index 8b8dd691fa1279a5b0e1fe9ee45368cb96f8034f..c900f959f973088ad389e79e05d791133b91bcc2 100644 (file)
@@ -41,16 +41,30 @@ struct evrpc_hook {
        void *process_arg;
 };
 
+TAILQ_HEAD(evrpc_hook_list, evrpc_hook);
+
+/*
+ * this is shared between the base and the pool, so that we can reuse
+ * the hook adding functions; we alias both evrpc_pool and evrpc_base
+ * to this common structure.
+ */
+struct _evrpc_hooks {
+       /* hooks for processing outbound and inbound rpcs */
+       struct evrpc_hook_list in_hooks;
+       struct evrpc_hook_list out_hooks;
+};
+
+#define input_hooks common.in_hooks
+#define output_hooks common.out_hooks
+
 struct evrpc_base {
+       struct _evrpc_hooks common;
+
        /* the HTTP server under which we register our RPC calls */
        struct evhttp* http_server;
 
        /* a list of all RPCs registered with us */
        TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs;
-       
-       /* hooks for processing outbound and inbound rpcs */
-       TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks;
-       struct evrpc_hook_list output_hooks;
 };
 
 struct evrpc_req_generic;
@@ -58,6 +72,10 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
 
 /* A pool for holding evhttp_connection objects */
 struct evrpc_pool {
+       struct _evrpc_hooks common;
+
+       struct event_base *base;
+
        struct evconq connections;
 
        int timeout;
diff --git a/evrpc.c b/evrpc.c
index b1fb4765376969e0fbf83c0ce615cba77b374c2f..6b5138d2be759bc7446870b6c05f68c416aa58e8 100644 (file)
--- a/evrpc.c
+++ b/evrpc.c
@@ -100,19 +100,20 @@ evrpc_free(struct evrpc_base *base)
 }
 
 void *
-evrpc_add_hook(struct evrpc_base *base,
+evrpc_add_hook(void *vbase,
     enum EVRPC_HOOK_TYPE hook_type,
     int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
     void *cb_arg)
 {
+       struct _evrpc_hooks *base = vbase;
        struct evrpc_hook_list *head = NULL;
        struct evrpc_hook *hook = NULL;
        switch (hook_type) {
        case INPUT:
-               head = &base->input_hooks;
+               head = &base->in_hooks;
                break;
        case OUTPUT:
-               head = &base->output_hooks;
+               head = &base->out_hooks;
                break;
        default:
                assert(hook_type == INPUT || hook_type == OUTPUT);
@@ -128,37 +129,42 @@ evrpc_add_hook(struct evrpc_base *base,
        return (hook);
 }
 
+static int
+evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
+{
+       struct evrpc_hook *hook = NULL;
+       TAILQ_FOREACH(hook, head, next) {
+               if (hook == handle) {
+                       TAILQ_REMOVE(head, hook, next);
+                       free(hook);
+                       return (1);
+               }
+       }
+
+       return (0);
+}
+
 /*
  * remove the hook specified by the handle
  */
 
 int
-evrpc_remove_hook(struct evrpc_base *base,
-    enum EVRPC_HOOK_TYPE hook_type,
-    void *handle)
+evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
 {
+       struct _evrpc_hooks *base = vbase;
        struct evrpc_hook_list *head = NULL;
-       struct evrpc_hook *hook = NULL;
        switch (hook_type) {
        case INPUT:
-               head = &base->input_hooks;
+               head = &base->in_hooks;
                break;
        case OUTPUT:
-               head = &base->output_hooks;
+               head = &base->out_hooks;
                break;
        default:
                assert(hook_type == INPUT || hook_type == OUTPUT);
        }
 
-       TAILQ_FOREACH(hook, head, next) {
-               if (hook == handle) {
-                       TAILQ_REMOVE(head, hook, next);
-                       free(hook);
-                       return (1);
-               }
-       }
-
-       return (0);
+       return (evrpc_remove_hook_internal(head, handle));
 }
 
 static int
@@ -371,7 +377,7 @@ static int evrpc_schedule_request(struct evhttp_connection *connection,
     struct evrpc_request_wrapper *ctx);
 
 struct evrpc_pool *
-evrpc_pool_new(void)
+evrpc_pool_new(struct event_base *base)
 {
        struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
        if (pool == NULL)
@@ -380,6 +386,10 @@ evrpc_pool_new(void)
        TAILQ_INIT(&pool->connections);
        TAILQ_INIT(&pool->requests);
 
+       TAILQ_INIT(&pool->input_hooks);
+       TAILQ_INIT(&pool->output_hooks);
+
+       pool->base = base;
        pool->timeout = -1;
 
        return (pool);
@@ -397,6 +407,7 @@ evrpc_pool_free(struct evrpc_pool *pool)
 {
        struct evhttp_connection *connection;
        struct evrpc_request_wrapper *request;
+       struct evrpc_hook *hook;
 
        while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
                TAILQ_REMOVE(&pool->requests, request, next);
@@ -409,6 +420,14 @@ evrpc_pool_free(struct evrpc_pool *pool)
                evhttp_connection_free(connection);
        }
 
+       while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
+               assert(evrpc_remove_hook(pool, INPUT, hook));
+       }
+
+       while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
+               assert(evrpc_remove_hook(pool, OUTPUT, hook));
+       }
+
        free(pool);
 }
 
@@ -423,6 +442,12 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
        assert(connection->http_server == NULL);
        TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 
+       /*
+        * associate an event base with this connection
+        */
+       if (pool->base != NULL)
+               evhttp_connection_set_base(connection, pool->base);
+
        /* 
         * unless a timeout was specifically set for a connection,
         * the connection inherits the timeout from the pool.
@@ -499,6 +524,11 @@ evrpc_schedule_request(struct evhttp_connection *connection,
        /* we need to know the connection that we might have to abort */
        ctx->evcon = connection;
 
+       /* apply hooks to the outgoing request */
+       if (evrpc_process_hooks(&pool->output_hooks,
+               req, req->output_buffer) == -1)
+               goto error;
+
        if (pool->timeout > 0) {
                /* 
                 * a timeout after which the whole rpc is going to be aborted.
@@ -533,6 +563,8 @@ evrpc_make_request(struct evrpc_request_wrapper *ctx)
 
        /* initialize the event structure for this rpc */
        evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
+       if (pool->base != NULL)
+               event_base_set(pool->base, &ctx->ev_timeout);
 
        /* we better have some available connections on the pool */
        assert(TAILQ_FIRST(&pool->connections) != NULL);
@@ -564,13 +596,22 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
 
        /* we need to get the reply now */
        if (req != NULL) {
-               res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
-               if (res == -1) {
-                       status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
+               /* apply hooks to the incoming request */
+               if (evrpc_process_hooks(&pool->input_hooks,
+                       req, req->input_buffer) == -1) {
+                       status.error = EVRPC_STATUS_ERR_HOOKABORTED;
+                       res = -1;
+               } else {
+                       res = ctx->reply_unmarshal(ctx->reply,
+                           req->input_buffer);
+                       if (res == -1) {
+                               status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
+                       }
                }
        } else {
                status.error = EVRPC_STATUS_ERR_TIMEOUT;
        }
+
        if (res == -1) {
                /* clear everything that we might have written previously */
                ctx->reply_clear(ctx->reply);
diff --git a/evrpc.h b/evrpc.h
index 45911146e1deb8bd50730d59559eaca3bb59950a..99b24b0ae5d7d37166142e9353278c3c3e71275c 100644 (file)
--- a/evrpc.h
+++ b/evrpc.h
@@ -66,6 +66,7 @@ extern "C" {
  */
 
 struct evbuffer;
+struct event_base;
 struct evrpc_req_generic;
 
 /* Encapsulates a request */
@@ -260,6 +261,7 @@ struct evrpc_status {
 #define EVRPC_STATUS_ERR_TIMEOUT       1
 #define EVRPC_STATUS_ERR_BADPAYLOAD    2
 #define EVRPC_STATUS_ERR_UNSTARTED     3
+#define EVRPC_STATUS_ERR_HOOKABORTED   4
        int error;
 
        /* for looking at headers or other information */
@@ -307,8 +309,12 @@ int evrpc_make_request(struct evrpc_request_wrapper *);
  * a pool has a number of connections associated with it.
  * rpc requests are always made via a pool.
  */
-struct evrpc_pool *evrpc_pool_new(void);
+struct evrpc_pool *evrpc_pool_new(struct event_base *);
 void evrpc_pool_free(struct evrpc_pool *);
+/*
+ * adds a connection over which rpc can be dispatched.  the connection
+ * object must have been newly created.
+ */
 void evrpc_pool_add_connection(struct evrpc_pool *, 
     struct evhttp_connection *);
 
@@ -329,6 +335,8 @@ void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
  * Hooks for changing the input and output of RPCs; this can be used to
  * implement compression, authentication, encryption, ...
  *
+ * vbase may either be a pointer to struct evrpc_base or to struct evrpc_pool
+ * 
  * If a hook returns -1, the processing is aborted.
  *
  * The add functions return handles that can be used for removing hooks.
@@ -338,12 +346,12 @@ enum EVRPC_HOOK_TYPE {
        INPUT, OUTPUT
 };
 
-void *evrpc_add_hook(struct evrpc_base *base,
+void *evrpc_add_hook(void *vbase,
     enum EVRPC_HOOK_TYPE hook_type,
     int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
     void *cb_arg);
 
-int evrpc_remove_hook(struct evrpc_base *base,
+int evrpc_remove_hook(void *vbase,
     enum EVRPC_HOOK_TYPE hook_type,
     void *handle);
 
index 341cf9913c64ea0ffb68004c76e0b2bbe89b7e42..3fa11f79c4d95afbd2bddd6f812ef8f1fcfc5426 100644 (file)
@@ -325,7 +325,7 @@ rpc_pool_with_connection(short port)
        struct evhttp_connection *evcon;
        struct evrpc_pool *pool;
 
-       pool = evrpc_pool_new();
+       pool = evrpc_pool_new(NULL);
        assert(pool != NULL);
 
        evcon = evhttp_connection_new("127.0.0.1", port);
@@ -346,8 +346,8 @@ GotKillCb(struct evrpc_status *status,
        if (need_output_hook) {
                struct evhttp_request *req = status->http_req;
                const char *header = evhttp_find_header(
-                       req->input_headers, "X-Hook");
-               assert(strcmp(header, "output") == 0);
+                       req->input_headers, "X-Pool-Hook");
+               assert(strcmp(header, "ran") == 0);
        }
 
        if (status->error != EVRPC_STATUS_ERR_NONE)
@@ -418,6 +418,19 @@ rpc_hook_add_header(struct evhttp_request *req,
        return (0);
 }
 
+static int
+rpc_hook_remove_header(struct evhttp_request *req,
+    struct evbuffer *evbuf, void *arg)
+{
+       const char *header = evhttp_find_header(req->input_headers, "X-Hook");
+       assert(header != NULL);
+       assert(strcmp(header, arg) == 0);
+       evhttp_remove_header(req->input_headers, "X-Hook");
+       evhttp_add_header(req->input_headers, "X-Pool-Hook", "ran");
+
+       return (0);
+}
+
 static void
 rpc_basic_client(void)
 {
@@ -442,6 +455,8 @@ rpc_basic_client(void)
 
        pool = rpc_pool_with_connection(port);
 
+       assert(evrpc_add_hook(pool, INPUT, rpc_hook_remove_header, "output"));
+
        /* set up the basic message */
        msg = msg_new();
        EVTAG_ASSIGN(msg, from_name, "niels");