From ff43ed5b3379125e3b57044016ca71b9b8f4739f Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Mon, 20 Nov 2006 07:44:37 +0000 Subject: [PATCH] finish RPC client support svn:r269 --- evrpc.c | 54 ++++++++++++++++++++---- test/regress_rpc.c | 100 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 8 deletions(-) diff --git a/evrpc.c b/evrpc.c index 2b94d86a..e422ccb6 100644 --- a/evrpc.c +++ b/evrpc.c @@ -243,6 +243,13 @@ evrpc_pool_new() return (pool); } +static void +evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) +{ + free(request->name); + free(request); +} + void evrpc_pool_free(struct evrpc_pool *pool) { @@ -252,8 +259,7 @@ evrpc_pool_free(struct evrpc_pool *pool) while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { TAILQ_REMOVE(&pool->requests, request, next); /* if this gets more complicated we need our own function */ - free(request->name); - free(request); + evrpc_request_wrapper_free(request); } while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { @@ -264,6 +270,11 @@ evrpc_pool_free(struct evrpc_pool *pool) free(pool); } +/* + * Add a connection to the RPC pool. A request scheduled on the pool + * may use any available connection. + */ + void evrpc_pool_add_connection(struct evrpc_pool *pool, struct evhttp_connection *connection) { @@ -271,7 +282,7 @@ evrpc_pool_add_connection(struct evrpc_pool *pool, TAILQ_INSERT_TAIL(&pool->connections, connection, next); /* - * if we have any requests, pending schedule them with the new + * if we have any requests pending, schedule them with the new * connections. */ @@ -309,19 +320,32 @@ static int evrpc_schedule_request(struct evhttp_connection *connection, struct evrpc_request_wrapper *ctx) { - struct evbuffer *output; - struct evhttp_request *req; - if ((output = evbuffer_new()) == NULL) - goto error; + struct evhttp_request *req = NULL; + char *uri = NULL; + int res = 0; if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) goto error; + /* serialize the request data into the output buffer */ + ctx->request_marshal(req->output_buffer, ctx->request); + + uri = evrpc_construct_uri(ctx->name); + if (uri == NULL) + goto error; + + /* start the request over the connection */ + res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); + free(uri); + + if (res == -1) + goto error; + return (0); error: (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg); - free(ctx); + evrpc_request_wrapper_free(ctx); return (-1); } @@ -354,4 +378,18 @@ static void evrpc_reply_done(struct evhttp_request *req, void *arg) { struct evrpc_request_wrapper *ctx = arg; + int res; + + /* we need to get the reply now */ + res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); + if (res == -1) { + /* clear everything that we might have written previously */ + ctx->reply_clear(ctx->reply); + } + + (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg); + + evrpc_request_wrapper_free(ctx); + + /* the http layer owns the request structure */ } diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 7e4b3e38..ceae9641 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -51,6 +51,7 @@ #include #include #include +#include #include "event.h" #include "evhttp.h" @@ -275,9 +276,108 @@ rpc_basic_message(void) evhttp_free(http); } +static struct evrpc_pool * +rpc_pool_with_connection(short port) +{ + struct evhttp_connection *evcon; + struct evrpc_pool *pool; + + pool = evrpc_pool_new(); + assert(pool != NULL); + + evcon = evhttp_connection_new("127.0.0.1", port); + assert(evcon != NULL); + + evrpc_pool_add_connection(pool, evcon); + + return (pool); +} + +static void +GotKillCb(struct msg *msg, struct kill *kill, void *arg) +{ + char *weapon; + char *action; + + if (EVTAG_GET(kill, weapon, &weapon) == -1) { + fprintf(stderr, "get weapon\n"); + goto done; + } + if (EVTAG_GET(kill, action, &action) == -1) { + fprintf(stderr, "get action\n"); + goto done; + } + + if (strcmp(weapon, "dagger")) + goto done; + + if (strcmp(action, "wave around like an idiot")) + goto done; + + test_ok += 1; +done: + event_loopexit(NULL); +} + +static void +rpc_basic_client(void) +{ + short port; + struct evhttp *http = NULL; + struct evrpc_base *base = NULL; + struct evrpc_pool *pool = NULL; + struct msg *msg; + struct kill *kill; + + fprintf(stdout, "Testing RPC Client: "); + + rpc_setup(&http, &port, &base); + + pool = rpc_pool_with_connection(port); + + /* set up the basic message */ + msg = msg_new(); + EVTAG_ASSIGN(msg, from_name, "niels"); + EVTAG_ASSIGN(msg, to_name, "tester"); + + kill = kill_new(); + + EVRPC_MAKE_REQUEST(Message, msg, kill, GotKillCb, NULL); + + test_ok = 0; + + event_dispatch(); + + if (test_ok != 1) { + fprintf(stdout, "FAILED (1)\n"); + exit(1); + } + + /* we do it twice to make sure that reuse works correctly */ + kill_clear(kill); + + EVRPC_MAKE_REQUEST(Message, msg, kill, GotKillCb, NULL); + + event_dispatch(); + + if (test_ok != 2) { + fprintf(stdout, "FAILED (2)\n"); + exit(1); + } + + fprintf(stdout, "OK\n"); + + msg_free(msg); + kill_free(kill); + + evrpc_pool_free(pool); + evhttp_free(http); +} + void rpc_suite(void) { rpc_basic_test(); rpc_basic_message(); + rpc_basic_client(); } -- 2.40.0