void *process_arg;
};
+TAILQ_HEAD(evrpc_hook_list, evrpc_hook);
+
+/*
+ * this is shared between the base and the pool, so that we can reuse
+ * the hook adding functions; we alias both evrpc_pool and evrpc_base
+ * to this common structure.
+ */
+struct _evrpc_hooks {
+ /* hooks for processing outbound and inbound rpcs */
+ struct evrpc_hook_list in_hooks;
+ struct evrpc_hook_list out_hooks;
+};
+
+#define input_hooks common.in_hooks
+#define output_hooks common.out_hooks
+
struct evrpc_base {
+ struct _evrpc_hooks common;
+
/* the HTTP server under which we register our RPC calls */
struct evhttp* http_server;
/* a list of all RPCs registered with us */
TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs;
-
- /* hooks for processing outbound and inbound rpcs */
- TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks;
- struct evrpc_hook_list output_hooks;
};
struct evrpc_req_generic;
/* A pool for holding evhttp_connection objects */
struct evrpc_pool {
+ struct _evrpc_hooks common;
+
+ struct event_base *base;
+
struct evconq connections;
int timeout;
}
void *
-evrpc_add_hook(struct evrpc_base *base,
+evrpc_add_hook(void *vbase,
enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg)
{
+ struct _evrpc_hooks *base = vbase;
struct evrpc_hook_list *head = NULL;
struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
- head = &base->input_hooks;
+ head = &base->in_hooks;
break;
case OUTPUT:
- head = &base->output_hooks;
+ head = &base->out_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
return (hook);
}
+static int
+evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
+{
+ struct evrpc_hook *hook = NULL;
+ TAILQ_FOREACH(hook, head, next) {
+ if (hook == handle) {
+ TAILQ_REMOVE(head, hook, next);
+ free(hook);
+ return (1);
+ }
+ }
+
+ return (0);
+}
+
/*
* remove the hook specified by the handle
*/
int
-evrpc_remove_hook(struct evrpc_base *base,
- enum EVRPC_HOOK_TYPE hook_type,
- void *handle)
+evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
{
+ struct _evrpc_hooks *base = vbase;
struct evrpc_hook_list *head = NULL;
- struct evrpc_hook *hook = NULL;
switch (hook_type) {
case INPUT:
- head = &base->input_hooks;
+ head = &base->in_hooks;
break;
case OUTPUT:
- head = &base->output_hooks;
+ head = &base->out_hooks;
break;
default:
assert(hook_type == INPUT || hook_type == OUTPUT);
}
- TAILQ_FOREACH(hook, head, next) {
- if (hook == handle) {
- TAILQ_REMOVE(head, hook, next);
- free(hook);
- return (1);
- }
- }
-
- return (0);
+ return (evrpc_remove_hook_internal(head, handle));
}
static int
struct evrpc_request_wrapper *ctx);
struct evrpc_pool *
-evrpc_pool_new(void)
+evrpc_pool_new(struct event_base *base)
{
struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
if (pool == NULL)
TAILQ_INIT(&pool->connections);
TAILQ_INIT(&pool->requests);
+ TAILQ_INIT(&pool->input_hooks);
+ TAILQ_INIT(&pool->output_hooks);
+
+ pool->base = base;
pool->timeout = -1;
return (pool);
{
struct evhttp_connection *connection;
struct evrpc_request_wrapper *request;
+ struct evrpc_hook *hook;
while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
TAILQ_REMOVE(&pool->requests, request, next);
evhttp_connection_free(connection);
}
+ while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
+ assert(evrpc_remove_hook(pool, INPUT, hook));
+ }
+
+ while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
+ assert(evrpc_remove_hook(pool, OUTPUT, hook));
+ }
+
free(pool);
}
assert(connection->http_server == NULL);
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
+ /*
+ * associate an event base with this connection
+ */
+ if (pool->base != NULL)
+ evhttp_connection_set_base(connection, pool->base);
+
/*
* unless a timeout was specifically set for a connection,
* the connection inherits the timeout from the pool.
/* we need to know the connection that we might have to abort */
ctx->evcon = connection;
+ /* apply hooks to the outgoing request */
+ if (evrpc_process_hooks(&pool->output_hooks,
+ req, req->output_buffer) == -1)
+ goto error;
+
if (pool->timeout > 0) {
/*
* a timeout after which the whole rpc is going to be aborted.
/* initialize the event structure for this rpc */
evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
+ if (pool->base != NULL)
+ event_base_set(pool->base, &ctx->ev_timeout);
/* we better have some available connections on the pool */
assert(TAILQ_FIRST(&pool->connections) != NULL);
/* we need to get the reply now */
if (req != NULL) {
- res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
- if (res == -1) {
- status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
+ /* apply hooks to the incoming request */
+ if (evrpc_process_hooks(&pool->input_hooks,
+ req, req->input_buffer) == -1) {
+ status.error = EVRPC_STATUS_ERR_HOOKABORTED;
+ res = -1;
+ } else {
+ res = ctx->reply_unmarshal(ctx->reply,
+ req->input_buffer);
+ if (res == -1) {
+ status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
+ }
}
} else {
status.error = EVRPC_STATUS_ERR_TIMEOUT;
}
+
if (res == -1) {
/* clear everything that we might have written previously */
ctx->reply_clear(ctx->reply);
*/
struct evbuffer;
+struct event_base;
struct evrpc_req_generic;
/* Encapsulates a request */
#define EVRPC_STATUS_ERR_TIMEOUT 1
#define EVRPC_STATUS_ERR_BADPAYLOAD 2
#define EVRPC_STATUS_ERR_UNSTARTED 3
+#define EVRPC_STATUS_ERR_HOOKABORTED 4
int error;
/* for looking at headers or other information */
* a pool has a number of connections associated with it.
* rpc requests are always made via a pool.
*/
-struct evrpc_pool *evrpc_pool_new(void);
+struct evrpc_pool *evrpc_pool_new(struct event_base *);
void evrpc_pool_free(struct evrpc_pool *);
+/*
+ * adds a connection over which rpc can be dispatched. the connection
+ * object must have been newly created.
+ */
void evrpc_pool_add_connection(struct evrpc_pool *,
struct evhttp_connection *);
* Hooks for changing the input and output of RPCs; this can be used to
* implement compression, authentication, encryption, ...
*
+ * vbase may either be a pointer to struct evrpc_base or to struct evrpc_pool
+ *
* If a hook returns -1, the processing is aborted.
*
* The add functions return handles that can be used for removing hooks.
INPUT, OUTPUT
};
-void *evrpc_add_hook(struct evrpc_base *base,
+void *evrpc_add_hook(void *vbase,
enum EVRPC_HOOK_TYPE hook_type,
int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
void *cb_arg);
-int evrpc_remove_hook(struct evrpc_base *base,
+int evrpc_remove_hook(void *vbase,
enum EVRPC_HOOK_TYPE hook_type,
void *handle);
struct evhttp_connection *evcon;
struct evrpc_pool *pool;
- pool = evrpc_pool_new();
+ pool = evrpc_pool_new(NULL);
assert(pool != NULL);
evcon = evhttp_connection_new("127.0.0.1", port);
if (need_output_hook) {
struct evhttp_request *req = status->http_req;
const char *header = evhttp_find_header(
- req->input_headers, "X-Hook");
- assert(strcmp(header, "output") == 0);
+ req->input_headers, "X-Pool-Hook");
+ assert(strcmp(header, "ran") == 0);
}
if (status->error != EVRPC_STATUS_ERR_NONE)
return (0);
}
+static int
+rpc_hook_remove_header(struct evhttp_request *req,
+ struct evbuffer *evbuf, void *arg)
+{
+ const char *header = evhttp_find_header(req->input_headers, "X-Hook");
+ assert(header != NULL);
+ assert(strcmp(header, arg) == 0);
+ evhttp_remove_header(req->input_headers, "X-Hook");
+ evhttp_add_header(req->input_headers, "X-Pool-Hook", "ran");
+
+ return (0);
+}
+
static void
rpc_basic_client(void)
{
pool = rpc_pool_with_connection(port);
+ assert(evrpc_add_hook(pool, INPUT, rpc_hook_remove_header, "output"));
+
/* set up the basic message */
msg = msg_new();
EVTAG_ASSIGN(msg, from_name, "niels");