return (pool);
}
+static void
+evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
+{
+ free(request->name);
+ free(request);
+}
+
void
evrpc_pool_free(struct evrpc_pool *pool)
{
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
TAILQ_REMOVE(&pool->requests, request, next);
/* if this gets more complicated we need our own function */
- free(request->name);
- free(request);
+ evrpc_request_wrapper_free(request);
}
while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
free(pool);
}
+/*
+ * Add a connection to the RPC pool. A request scheduled on the pool
+ * may use any available connection.
+ */
+
void
evrpc_pool_add_connection(struct evrpc_pool *pool,
struct evhttp_connection *connection) {
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
/*
- * if we have any requests, pending schedule them with the new
+ * if we have any requests pending, schedule them with the new
* connections.
*/
evrpc_schedule_request(struct evhttp_connection *connection,
struct evrpc_request_wrapper *ctx)
{
- struct evbuffer *output;
- struct evhttp_request *req;
- if ((output = evbuffer_new()) == NULL)
- goto error;
+ struct evhttp_request *req = NULL;
+ char *uri = NULL;
+ int res = 0;
if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
goto error;
+ /* serialize the request data into the output buffer */
+ ctx->request_marshal(req->output_buffer, ctx->request);
+
+ uri = evrpc_construct_uri(ctx->name);
+ if (uri == NULL)
+ goto error;
+
+ /* start the request over the connection */
+ res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
+ free(uri);
+
+ if (res == -1)
+ goto error;
+
return (0);
error:
(*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
- free(ctx);
+ evrpc_request_wrapper_free(ctx);
return (-1);
}
evrpc_reply_done(struct evhttp_request *req, void *arg)
{
struct evrpc_request_wrapper *ctx = arg;
+ int res;
+
+ /* we need to get the reply now */
+ res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
+ if (res == -1) {
+ /* clear everything that we might have written previously */
+ ctx->reply_clear(ctx->reply);
+ }
+
+ (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
+
+ evrpc_request_wrapper_free(ctx);
+
+ /* the http layer owns the request structure */
}
#include <stdio.h>
#include <string.h>
#include <errno.h>
+#include <assert.h>
#include "event.h"
#include "evhttp.h"
evhttp_free(http);
}
+static struct evrpc_pool *
+rpc_pool_with_connection(short port)
+{
+ struct evhttp_connection *evcon;
+ struct evrpc_pool *pool;
+
+ pool = evrpc_pool_new();
+ assert(pool != NULL);
+
+ evcon = evhttp_connection_new("127.0.0.1", port);
+ assert(evcon != NULL);
+
+ evrpc_pool_add_connection(pool, evcon);
+
+ return (pool);
+}
+
+static void
+GotKillCb(struct msg *msg, struct kill *kill, void *arg)
+{
+ char *weapon;
+ char *action;
+
+ if (EVTAG_GET(kill, weapon, &weapon) == -1) {
+ fprintf(stderr, "get weapon\n");
+ goto done;
+ }
+ if (EVTAG_GET(kill, action, &action) == -1) {
+ fprintf(stderr, "get action\n");
+ goto done;
+ }
+
+ if (strcmp(weapon, "dagger"))
+ goto done;
+
+ if (strcmp(action, "wave around like an idiot"))
+ goto done;
+
+ test_ok += 1;
+done:
+ event_loopexit(NULL);
+}
+
+static void
+rpc_basic_client(void)
+{
+ short port;
+ struct evhttp *http = NULL;
+ struct evrpc_base *base = NULL;
+ struct evrpc_pool *pool = NULL;
+ struct msg *msg;
+ struct kill *kill;
+
+ fprintf(stdout, "Testing RPC Client: ");
+
+ rpc_setup(&http, &port, &base);
+
+ pool = rpc_pool_with_connection(port);
+
+ /* set up the basic message */
+ msg = msg_new();
+ EVTAG_ASSIGN(msg, from_name, "niels");
+ EVTAG_ASSIGN(msg, to_name, "tester");
+
+ kill = kill_new();
+
+ EVRPC_MAKE_REQUEST(Message, msg, kill, GotKillCb, NULL);
+
+ test_ok = 0;
+
+ event_dispatch();
+
+ if (test_ok != 1) {
+ fprintf(stdout, "FAILED (1)\n");
+ exit(1);
+ }
+
+ /* we do it twice to make sure that reuse works correctly */
+ kill_clear(kill);
+
+ EVRPC_MAKE_REQUEST(Message, msg, kill, GotKillCb, NULL);
+
+ event_dispatch();
+
+ if (test_ok != 2) {
+ fprintf(stdout, "FAILED (2)\n");
+ exit(1);
+ }
+
+ fprintf(stdout, "OK\n");
+
+ msg_free(msg);
+ kill_free(kill);
+
+ evrpc_pool_free(pool);
+ evhttp_free(http);
+}
+
void
rpc_suite(void)
{
rpc_basic_test();
rpc_basic_message();
+ rpc_basic_client();
}