]> granicus.if.org Git - libevent/commitdiff
fix a bug where rpc would not be scheduled when they were queued; test for it.
authorNiels Provos <provos@gmail.com>
Wed, 22 Nov 2006 06:54:28 +0000 (06:54 +0000)
committerNiels Provos <provos@gmail.com>
Wed, 22 Nov 2006 06:54:28 +0000 (06:54 +0000)
allow a configurable timeout for connections and RPCs.

svn:r274

evhttp.h
evrpc-internal.h
evrpc.c
evrpc.h
http-internal.h
http.c
test/regress_rpc.c

index f31a013c17bbcf53ec84106c9619243cb5788306..cb5330aebf039603653bb689dbfca80f24900c93 100644 (file)
--- a/evhttp.h
+++ b/evhttp.h
@@ -150,6 +150,10 @@ struct evhttp_connection *evhttp_connection_new(
 /* Frees an http connection */
 void evhttp_connection_free(struct evhttp_connection *evcon);
 
+/* Sets the timeout for events related to this connection */
+void evhttp_connection_set_timeout(struct evhttp_connection *evcon,
+    int timeout_in_secs);
+
 /* The connection gets ownership of the request */
 int evhttp_make_request(struct evhttp_connection *evcon,
     struct evhttp_request *req,
index de2ab47dbeb11fb133a19df8f5927e6d914eab77..656533b603ef976affeb1fa9e61dcf10ea39a6ea 100644 (file)
@@ -48,6 +48,8 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
 struct evrpc_pool {
        struct evconq connections;
 
+       int timeout;
+
        TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests;
 };
 
diff --git a/evrpc.c b/evrpc.c
index bf66c181d3d5f7388073dae840a1a4b34614b5eb..d575cd14adad88a621a78481f3a21a031a11a899 100644 (file)
--- a/evrpc.c
+++ b/evrpc.c
@@ -85,7 +85,8 @@ evrpc_free(struct evrpc_base *base)
 
 }
 
-void evrpc_request_cb(struct evhttp_request *, void *);
+static void evrpc_pool_schedule(struct evrpc_pool *pool);
+static void evrpc_request_cb(struct evhttp_request *, void *);
 void evrpc_request_done(struct evrpc_req_generic*);
 
 /*
@@ -132,7 +133,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
        return (0);
 }
 
-void
+static void
 evrpc_request_cb(struct evhttp_request *req, void *arg)
 {
        struct evrpc *rpc = arg;
@@ -244,6 +245,8 @@ evrpc_pool_new()
        TAILQ_INIT(&pool->connections);
        TAILQ_INIT(&pool->requests);
 
+       pool->timeout = -1;
+
        return (pool);
 }
 
@@ -285,6 +288,13 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
        assert(connection->http_server == NULL);
        TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 
+       /* 
+        * unless a timeout was specifically set for a connection,
+        * the connection inherits the timeout from the pool.
+        */
+       if (connection->timeout == -1)
+               connection->timeout = pool->timeout;
+
        /* 
         * if we have any requests pending, schedule them with the new
         * connections.
@@ -298,8 +308,19 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
        }
 }
 
+void
+evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
+{
+       struct evhttp_connection *evcon;
+       TAILQ_FOREACH(evcon, &pool->connections, next) {
+               evcon->timeout = timeout_in_secs;
+       }
+       pool->timeout = timeout_in_secs;
+}
+
 
 static void evrpc_reply_done(struct evhttp_request *, void *);
+static void evrpc_request_timeout(int, short, void *);
 
 /*
  * Finds a connection object associated with the pool that is currently
@@ -325,6 +346,7 @@ evrpc_schedule_request(struct evhttp_connection *connection,
     struct evrpc_request_wrapper *ctx)
 {
        struct evhttp_request *req = NULL;
+       struct evrpc_pool *pool = ctx->pool;
        char *uri = NULL;
        int res = 0;
 
@@ -338,6 +360,19 @@ evrpc_schedule_request(struct evhttp_connection *connection,
        if (uri == NULL)
                goto error;
 
+       /* we need to know the connection that we might have to abort */
+       ctx->evcon = connection;
+
+       if (pool->timeout > 0) {
+               /* 
+                * a timeout after which the whole rpc is going to be aborted.
+                */
+               struct timeval tv;
+               timerclear(&tv);
+               tv.tv_sec = pool->timeout;
+               evtimer_add(&ctx->ev_timeout, &tv);
+       }
+
        /* start the request over the connection */
        res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
        free(uri);
@@ -357,24 +392,21 @@ int
 evrpc_make_request(struct evrpc_request_wrapper *ctx)
 {
        struct evrpc_pool *pool = ctx->pool;
-       struct evhttp_connection *connection;
+
+       /* initialize the event structure for this rpc */
+       evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
 
        /* we better have some available connections on the pool */
        assert(TAILQ_FIRST(&pool->connections) != NULL);
 
-
-       /* even if a connection might be available, we do FIFO */
-       if (TAILQ_FIRST(&pool->requests) == NULL) {
-               connection = evrpc_pool_find_connection(pool);
-               if (connection != NULL)
-                       return evrpc_schedule_request(connection, ctx);
-       }
-
        /* 
         * if no connection is available, we queue the request on the pool,
         * the next time a connection is empty, the rpc will be send on that.
         */
        TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
+
+       evrpc_pool_schedule(pool);
+
        return (0);
 }
 
@@ -382,10 +414,15 @@ static void
 evrpc_reply_done(struct evhttp_request *req, void *arg)
 {
        struct evrpc_request_wrapper *ctx = arg;
-       int res;
+       struct evrpc_pool *pool = ctx->pool;
+       int res = -1;
+       
+       /* cancel any timeout we might have scheduled */
+       event_del(&ctx->ev_timeout);
 
        /* we need to get the reply now */
-       res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
+       if (req != NULL)
+               res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
        if (res == -1) {
                /* clear everything that we might have written previously */
                ctx->reply_clear(ctx->reply);
@@ -396,4 +433,33 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
        evrpc_request_wrapper_free(ctx);
 
        /* the http layer owns the request structure */
+
+       /* see if we can schedule another request */
+       evrpc_pool_schedule(pool);
+}
+
+static void
+evrpc_pool_schedule(struct evrpc_pool *pool)
+{
+       struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
+       struct evhttp_connection *evcon;
+
+       /* if no requests are pending, we have no work */
+       if (ctx == NULL)
+               return;
+
+       if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
+               TAILQ_REMOVE(&pool->requests, ctx, next);
+               evrpc_schedule_request(evcon, ctx);
+       }
+}
+
+static void
+evrpc_request_timeout(int fd, short what, void *arg)
+{
+       struct evrpc_request_wrapper *ctx = arg;
+       struct evhttp_connection *evcon = ctx->evcon;
+       assert(evcon != NULL);
+
+       evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
 }
diff --git a/evrpc.h b/evrpc.h
index 4640d0ba562ce015c1be6d19718f954c7311c1dc..5f2dfd67496469988860f951ba28f49fc950de7b 100644 (file)
--- a/evrpc.h
+++ b/evrpc.h
@@ -154,6 +154,7 @@ int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \
                return (-1);                                        \
        }                                                           \
        ctx->pool = pool;                                           \
+       ctx->evcon = NULL;                                          \
        ctx->name = strdup(#rpcname);                               \
        if (ctx->name == NULL) {                                    \
                free(ctx);                                          \
@@ -228,6 +229,12 @@ struct evrpc_request_wrapper {
         /* pool on which this rpc request is being made */
         struct evrpc_pool *pool;
 
+        /* connection on which the request is being sent */
+       struct evhttp_connection *evcon;
+
+       /* event for implementing request timeouts */
+       struct event ev_timeout;
+
        /* the name of the rpc */
        char *name;
 
@@ -262,4 +269,17 @@ void evrpc_pool_free(struct evrpc_pool *);
 void evrpc_pool_add_connection(struct evrpc_pool *, 
     struct evhttp_connection *);
 
+/*
+ * Sets the timeout in secs after which a request has to complete.  The
+ * RPC is completely aborted if it does not complete by then.  Setting
+ * the timeout to 0 means that it never timeouts and can be used to
+ * implement callback type RPCs.
+ *
+ * Any connection already in the pool will be updated with the new
+ * timeout.  Connections added to the pool after set_timeout has be
+ * called receive the pool timeout only if no timeout has been set
+ * for the connection itself.
+ */
+void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
+
 #endif /* _EVRPC_H_ */
index b8ef6639b5e3e9fa556b42bb7c8f39708d537729..f95a2090ddca335420e6e68f175e2ce28703c4f5 100644 (file)
@@ -50,6 +50,8 @@ struct evhttp_connection {
        int flags;
 #define EVHTTP_CON_INCOMING    0x0001  /* only one request on it ever */
 #define EVHTTP_CON_OUTGOING    0x0002  /* multiple requests possible */
+
+       int timeout;                    /* timeout in seconds for events */
        
        enum evhttp_connection_state state;
 
diff --git a/http.c b/http.c
index a330028bdee3728f7e8f29a04ae2731675fcb59c..1711eec8a0206727f0f40620f5e52d5ef485bc45 100644 (file)
--- a/http.c
+++ b/http.c
@@ -218,12 +218,24 @@ evhttp_method(enum evhttp_cmd_type type)
        return (method);
 }
 
+static void
+evhttp_add_event(struct event *ev, int timeout, int default_timeout)
+{
+       if (timeout != 0) {
+               struct timeval tv;
+               
+               timerclear(&tv);
+               tv.tv_sec = timeout != -1 ? timeout : default_timeout;
+               event_add(ev, &tv);
+       } else {
+               event_add(ev, NULL);
+       }
+}
+
 void
 evhttp_write_buffer(struct evhttp_connection *evcon,
     void (*cb)(struct evhttp_connection *, void *), void *arg)
 {
-       struct timeval tv;
-
        event_debug(("%s: preparing to write buffer\n", __func__));
 
        /* Set call back */
@@ -232,9 +244,7 @@ evhttp_write_buffer(struct evhttp_connection *evcon,
 
        /* xxx: maybe check if the event is still pending? */
        event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon);
-       timerclear(&tv);
-       tv.tv_sec = HTTP_WRITE_TIMEOUT;
-       event_add(&evcon->ev, &tv);
+       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT);
 }
 
 /*
@@ -464,7 +474,6 @@ void
 evhttp_write(int fd, short what, void *arg)
 {
        struct evhttp_connection *evcon = arg;
-       struct timeval tv;
        int n;
 
        if (what == EV_TIMEOUT) {
@@ -486,9 +495,8 @@ evhttp_write(int fd, short what, void *arg)
        }
 
        if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) {
-               timerclear(&tv);
-               tv.tv_sec = HTTP_WRITE_TIMEOUT;
-               event_add(&evcon->ev, &tv);
+               evhttp_add_event(&evcon->ev, 
+                   evcon->timeout, HTTP_WRITE_TIMEOUT);
                return;
        }
 
@@ -549,7 +557,6 @@ evhttp_read(int fd, short what, void *arg)
 {
        struct evhttp_connection *evcon = arg;
        struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
-       struct timeval tv;
        int n;
 
        if (what == EV_TIMEOUT) {
@@ -574,10 +581,8 @@ evhttp_read(int fd, short what, void *arg)
                evhttp_connection_done(evcon);
                return;
        }
-       
-       timerclear(&tv);
-       tv.tv_sec = HTTP_READ_TIMEOUT;
-       event_add(&evcon->ev, &tv);
+
+       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
 }
 
 void
@@ -971,7 +976,6 @@ evhttp_parse_lines(struct evhttp_request *req, struct evbuffer* buffer)
 void
 evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
 {
-       struct timeval tv;
        const char *content_length;
        const char *connection;
        struct evkeyvalq *headers = req->input_headers;
@@ -1013,16 +1017,12 @@ evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
        }
 
        event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
-       timerclear(&tv);
-       tv.tv_sec = HTTP_READ_TIMEOUT;
-       event_add(&evcon->ev, &tv);
-       return;
+       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
 }
 
 void
 evhttp_read_header(int fd, short what, void *arg)
 {
-       struct timeval tv;
        struct evhttp_connection *evcon = arg;
        struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
        int n, res;
@@ -1053,9 +1053,8 @@ evhttp_read_header(int fd, short what, void *arg)
                return;
        } else if (res == 0) {
                /* Need more header lines */
-               timerclear(&tv);
-               tv.tv_sec = HTTP_READ_TIMEOUT;
-               event_add(&evcon->ev, &tv);
+               evhttp_add_event(&evcon->ev, 
+                   evcon->timeout, HTTP_READ_TIMEOUT);
                return;
        }
 
@@ -1105,6 +1104,8 @@ evhttp_connection_new(const char *address, unsigned short port)
        evcon->fd = -1;
        evcon->port = port;
 
+       evcon->timeout = -1;
+
        if ((evcon->address = strdup(address)) == NULL) {
                event_warn("%s: strdup failed", __func__);
                goto error;
@@ -1131,11 +1132,16 @@ evhttp_connection_new(const char *address, unsigned short port)
        return (NULL);
 }
 
+void
+evhttp_connection_set_timeout(struct evhttp_connection *evcon,
+    int timeout_in_secs)
+{
+       evcon->timeout = timeout_in_secs;
+}
+
 int
 evhttp_connection_connect(struct evhttp_connection *evcon)
 {
-       struct timeval tv;
-       
        if (evcon->state == EVCON_CONNECTING)
                return (0);
        
@@ -1154,9 +1160,7 @@ evhttp_connection_connect(struct evhttp_connection *evcon)
 
        /* Set up a callback for successful connection setup */
        event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_connectioncb, evcon);
-       timerclear(&tv);
-       tv.tv_sec = HTTP_CONNECT_TIMEOUT;
-       event_add(&evcon->ev, &tv);
+       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_CONNECT_TIMEOUT);
 
        evcon->state = EVCON_CONNECTING;
        
@@ -1217,16 +1221,12 @@ evhttp_make_request(struct evhttp_connection *evcon,
 void
 evhttp_start_read(struct evhttp_connection *evcon)
 {
-       struct timeval tv;
-
        /* Set up an event to read the headers */
        if (event_initialized(&evcon->ev))
                event_del(&evcon->ev);
        event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read_header, evcon);
-
-       timerclear(&tv);
-       tv.tv_sec = HTTP_READ_TIMEOUT;
-       event_add(&evcon->ev, &tv);
+       
+       evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
 }
 
 void
index 863b9571d0d636d35be7dfcae4181d7074235438..3db21e3828d1f9e1ef7936d14c629eabdc57eeef 100644 (file)
@@ -315,10 +315,39 @@ GotKillCb(struct msg *msg, struct kill *kill, void *arg)
                goto done;
 
        test_ok += 1;
+
 done:
        event_loopexit(NULL);
 }
 
+static void
+GotKillCbTwo(struct msg *msg, struct kill *kill, void *arg)
+{
+       char *weapon;
+       char *action;
+
+       if (EVTAG_GET(kill, weapon, &weapon) == -1) {
+               fprintf(stderr, "get weapon\n");
+               goto done;
+       }
+       if (EVTAG_GET(kill, action, &action) == -1) {
+               fprintf(stderr, "get action\n");
+               goto done;
+       }
+
+       if (strcmp(weapon, "dagger"))
+               goto done;
+
+       if (strcmp(action, "wave around like an idiot"))
+               goto done;
+
+       test_ok += 1;
+
+done:
+       if (test_ok == 2)
+               event_loopexit(NULL);
+}
+
 static void
 rpc_basic_client(void)
 {
@@ -374,10 +403,62 @@ rpc_basic_client(void)
        evhttp_free(http);
 }
 
+/* 
+ * We are testing that the second requests gets send over the same
+ * connection after the first RPCs completes.
+ */
+static void
+rpc_basic_queued_client(void)
+{
+       short port;
+       struct evhttp *http = NULL;
+       struct evrpc_base *base = NULL;
+       struct evrpc_pool *pool = NULL;
+       struct msg *msg;
+       struct kill *kill_one, *kill_two;
+
+       fprintf(stdout, "Testing RPC (Queued) Client: ");
+
+       rpc_setup(&http, &port, &base);
+
+       pool = rpc_pool_with_connection(port);
+
+       /* set up the basic message */
+       msg = msg_new();
+       EVTAG_ASSIGN(msg, from_name, "niels");
+       EVTAG_ASSIGN(msg, to_name, "tester");
+
+       kill_one = kill_new();
+       kill_two = kill_new();
+
+       EVRPC_MAKE_REQUEST(Message, msg, kill_one,  GotKillCbTwo, NULL);
+       EVRPC_MAKE_REQUEST(Message, msg, kill_two,  GotKillCb, NULL);
+
+       test_ok = 0;
+
+       event_dispatch();
+       
+       if (test_ok != 2) {
+               fprintf(stdout, "FAILED (1)\n");
+               exit(1);
+       }
+
+       fprintf(stdout, "OK\n");
+
+       msg_free(msg);
+       kill_free(kill_one);
+       kill_free(kill_two);
+
+       evrpc_pool_free(pool);
+       evhttp_free(http);
+}
+
+
 void
 rpc_suite(void)
 {
        rpc_basic_test();
        rpc_basic_message();
        rpc_basic_client();
+       rpc_basic_queued_client();
 }