]> granicus.if.org Git - libevent/commitdiff
finish RPC client support
authorNiels Provos <provos@gmail.com>
Mon, 20 Nov 2006 07:44:37 +0000 (07:44 +0000)
committerNiels Provos <provos@gmail.com>
Mon, 20 Nov 2006 07:44:37 +0000 (07:44 +0000)
svn:r269

evrpc.c
test/regress_rpc.c

diff --git a/evrpc.c b/evrpc.c
index 2b94d86a1dc0cf2f9780db80086c8d8d5e10a8be..e422ccb67f652ef93fe04f51cf9706639f33eb86 100644 (file)
--- a/evrpc.c
+++ b/evrpc.c
@@ -243,6 +243,13 @@ evrpc_pool_new()
        return (pool);
 }
 
+static void
+evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
+{
+       free(request->name);
+       free(request);
+}
+
 void
 evrpc_pool_free(struct evrpc_pool *pool)
 {
@@ -252,8 +259,7 @@ evrpc_pool_free(struct evrpc_pool *pool)
        while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
                TAILQ_REMOVE(&pool->requests, request, next);
                /* if this gets more complicated we need our own function */
-               free(request->name);
-               free(request);
+               evrpc_request_wrapper_free(request);
        }
 
        while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
@@ -264,6 +270,11 @@ evrpc_pool_free(struct evrpc_pool *pool)
        free(pool);
 }
 
+/*
+ * Add a connection to the RPC pool.   A request scheduled on the pool
+ * may use any available connection.
+ */
+
 void
 evrpc_pool_add_connection(struct evrpc_pool *pool,
     struct evhttp_connection *connection) {
@@ -271,7 +282,7 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
        TAILQ_INSERT_TAIL(&pool->connections, connection, next);
 
        /* 
-        * if we have any requests, pending schedule them with the new
+        * if we have any requests pending, schedule them with the new
         * connections.
         */
 
@@ -309,19 +320,32 @@ static int
 evrpc_schedule_request(struct evhttp_connection *connection,
     struct evrpc_request_wrapper *ctx)
 {
-       struct evbuffer *output;
-       struct evhttp_request *req;
-       if ((output = evbuffer_new()) == NULL)
-               goto error;
+       struct evhttp_request *req = NULL;
+       char *uri = NULL;
+       int res = 0;
 
        if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
                goto error;
 
+       /* serialize the request data into the output buffer */
+       ctx->request_marshal(req->output_buffer, ctx->request);
+
+       uri = evrpc_construct_uri(ctx->name);
+       if (uri == NULL)
+               goto error;
+
+       /* start the request over the connection */
+       res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
+       free(uri);
+
+       if (res == -1)
+               goto error;
+
        return (0);
 
 error:
        (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
-       free(ctx);
+       evrpc_request_wrapper_free(ctx);
        return (-1);
 }
 
@@ -354,4 +378,18 @@ static void
 evrpc_reply_done(struct evhttp_request *req, void *arg)
 {
        struct evrpc_request_wrapper *ctx = arg;
+       int res;
+
+       /* we need to get the reply now */
+       res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
+       if (res == -1) {
+               /* clear everything that we might have written previously */
+               ctx->reply_clear(ctx->reply);
+       }
+
+       (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
+       
+       evrpc_request_wrapper_free(ctx);
+
+       /* the http layer owns the request structure */
 }
index 7e4b3e388ffc3125d1f5438209f981ce0b3228c8..ceae96415c4a590f79ccc0b96415f2c9af20d4a1 100644 (file)
@@ -51,6 +51,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
+#include <assert.h>
 
 #include "event.h"
 #include "evhttp.h"
@@ -275,9 +276,108 @@ rpc_basic_message(void)
        evhttp_free(http);
 }
 
+static struct evrpc_pool *
+rpc_pool_with_connection(short port)
+{
+       struct evhttp_connection *evcon;
+       struct evrpc_pool *pool;
+
+       pool = evrpc_pool_new();
+       assert(pool != NULL);
+
+       evcon = evhttp_connection_new("127.0.0.1", port);
+       assert(evcon != NULL);
+
+       evrpc_pool_add_connection(pool, evcon);
+       
+       return (pool);
+}
+
+static void
+GotKillCb(struct msg *msg, struct kill *kill, void *arg)
+{
+       char *weapon;
+       char *action;
+
+       if (EVTAG_GET(kill, weapon, &weapon) == -1) {
+               fprintf(stderr, "get weapon\n");
+               goto done;
+       }
+       if (EVTAG_GET(kill, action, &action) == -1) {
+               fprintf(stderr, "get action\n");
+               goto done;
+       }
+
+       if (strcmp(weapon, "dagger"))
+               goto done;
+
+       if (strcmp(action, "wave around like an idiot"))
+               goto done;
+
+       test_ok += 1;
+done:
+       event_loopexit(NULL);
+}
+
+static void
+rpc_basic_client(void)
+{
+       short port;
+       struct evhttp *http = NULL;
+       struct evrpc_base *base = NULL;
+       struct evrpc_pool *pool = NULL;
+       struct msg *msg;
+       struct kill *kill;
+
+       fprintf(stdout, "Testing RPC Client: ");
+
+       rpc_setup(&http, &port, &base);
+
+       pool = rpc_pool_with_connection(port);
+
+       /* set up the basic message */
+       msg = msg_new();
+       EVTAG_ASSIGN(msg, from_name, "niels");
+       EVTAG_ASSIGN(msg, to_name, "tester");
+
+       kill = kill_new();
+
+       EVRPC_MAKE_REQUEST(Message, msg, kill,  GotKillCb, NULL);
+
+       test_ok = 0;
+
+       event_dispatch();
+       
+       if (test_ok != 1) {
+               fprintf(stdout, "FAILED (1)\n");
+               exit(1);
+       }
+
+       /* we do it twice to make sure that reuse works correctly */
+       kill_clear(kill);
+
+       EVRPC_MAKE_REQUEST(Message, msg, kill,  GotKillCb, NULL);
+
+       event_dispatch();
+       
+       if (test_ok != 2) {
+               fprintf(stdout, "FAILED (2)\n");
+               exit(1);
+       }
+
+       fprintf(stdout, "OK\n");
+
+       msg_free(msg);
+       kill_free(kill);
+
+       evrpc_pool_free(pool);
+       evhttp_free(http);
+}
+
 void
 rpc_suite(void)
 {
        rpc_basic_test();
        rpc_basic_message();
+       rpc_basic_client();
 }