From 819d4a33d766d1d585f73abb9d7c77d4f862970f Mon Sep 17 00:00:00 2001 From: Niels Provos Date: Thu, 27 Dec 2007 23:17:24 +0000 Subject: [PATCH] allow hooks to pause RPC processing; this will allow hooks to do meaningful work before resuming the RPC processing; this is not backwards compatible. svn:r617 --- ChangeLog | 1 + evrpc-internal.h | 18 ++- evrpc.c | 320 ++++++++++++++++++++++++++++++++++++--------- evrpc.h | 34 ++++- test/regress_rpc.c | 101 +++++++++++++- 5 files changed, 408 insertions(+), 66 deletions(-) diff --git a/ChangeLog b/ChangeLog index 43686100..a84afd72 100644 --- a/ChangeLog +++ b/ChangeLog @@ -30,6 +30,7 @@ Changes in current version: o removed linger from http server socket; reported by Ilya Martynov o event_rpcgen now allows creating integer arrays o support string arrays in event_rpcgen + o change evrpc hooking to allow pausing of RPCs; this will make it possible for the hook to do some meaning ful work; this is not backwards compatible. Changes in 1.4.0: o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr. diff --git a/evrpc-internal.h b/evrpc-internal.h index c900f959..74183530 100644 --- a/evrpc-internal.h +++ b/evrpc-internal.h @@ -37,7 +37,8 @@ struct evrpc_hook { TAILQ_ENTRY(evrpc_hook) (next); /* returns -1; if the rpc should be aborted, is allowed to rewrite */ - int (*process)(struct evhttp_request *, struct evbuffer *, void *); + int (*process)(void *, struct evhttp_request *, + struct evbuffer *, void *); void *process_arg; }; @@ -48,14 +49,21 @@ TAILQ_HEAD(evrpc_hook_list, evrpc_hook); * the hook adding functions; we alias both evrpc_pool and evrpc_base * to this common structure. */ + +struct evrpc_hook_ctx; +TAILQ_HEAD(evrpc_pause_list, evrpc_hook_ctx); + struct _evrpc_hooks { /* hooks for processing outbound and inbound rpcs */ struct evrpc_hook_list in_hooks; struct evrpc_hook_list out_hooks; + + struct evrpc_pause_list pause_requests; }; #define input_hooks common.in_hooks #define output_hooks common.out_hooks +#define paused_requests common.pause_requests struct evrpc_base { struct _evrpc_hooks common; @@ -80,8 +88,14 @@ struct evrpc_pool { int timeout; - TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests; + TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) (requests); }; +struct evrpc_hook_ctx { + TAILQ_ENTRY(evrpc_hook_ctx) (next); + + void *ctx; + void (*cb)(void *, enum EVRPC_HOOK_RESULT); +}; #endif /* _EVRPC_INTERNAL_H_ */ diff --git a/evrpc.c b/evrpc.c index 670f3c0e..5586c4ec 100644 --- a/evrpc.c +++ b/evrpc.c @@ -75,6 +75,9 @@ evrpc_init(struct evhttp *http_server) TAILQ_INIT(&base->registered_rpcs); TAILQ_INIT(&base->input_hooks); TAILQ_INIT(&base->output_hooks); + + TAILQ_INIT(&base->paused_requests); + base->http_server = http_server; return (base); @@ -101,7 +104,7 @@ evrpc_free(struct evrpc_base *base) void * evrpc_add_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, - int (*cb)(struct evhttp_request *, struct evbuffer *, void *), + int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), void *cb_arg) { struct _evrpc_hooks *base = vbase; @@ -167,12 +170,12 @@ evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) } static int -evrpc_process_hooks(struct evrpc_hook_list *head, +evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, struct evhttp_request *req, struct evbuffer *evbuf) { struct evrpc_hook *hook; TAILQ_FOREACH(hook, head, next) { - if (hook->process(req, evbuf, hook->process_arg) == -1) + if (hook->process(ctx, req, evbuf, hook->process_arg) == -1) return (-1); } @@ -257,28 +260,70 @@ evrpc_unregister_rpc(struct evrpc_base *base, const char *name) return (0); } +static int evrpc_pause_request(void *vbase, void *ctx, + void (*cb)(void *, enum EVRPC_HOOK_RESULT)); +static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); + static void evrpc_request_cb(struct evhttp_request *req, void *arg) { struct evrpc *rpc = arg; struct evrpc_req_generic *rpc_state = NULL; + int hook_res; /* let's verify the outside parameters */ if (req->type != EVHTTP_REQ_POST || EVBUFFER_LENGTH(req->input_buffer) <= 0) goto error; + rpc_state = event_calloc(1, sizeof(struct evrpc_req_generic)); + if (rpc_state == NULL) + goto error; + rpc_state->rpc = rpc; + rpc_state->http_req = req; + rpc_state->rpc_data = NULL; + rpc_state->done = evrpc_request_done; + + /* * we might want to allow hooks to suspend the processing, * but at the moment, we assume that they just act as simple * filters. */ - if (evrpc_process_hooks(&rpc->base->input_hooks, - req, req->input_buffer) == -1) + hook_res = evrpc_process_hooks(&rpc->base->input_hooks, + rpc_state, req, req->input_buffer); + switch (hook_res) { + case EVRPC_TERMINATE: goto error; + case EVRPC_PAUSE: + evrpc_pause_request(rpc->base, rpc_state, + evrpc_request_cb_closure); + return; + case EVRPC_CONTINUE: + break; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + } - rpc_state = event_calloc(1, sizeof(struct evrpc_req_generic)); - if (rpc_state == NULL) + evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); + return; + +error: + if (rpc_state != NULL) + evrpc_reqstate_free(rpc_state); + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error"); + return; +} + +static void +evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_req_generic *rpc_state = arg; + struct evrpc *rpc = rpc_state->rpc; + struct evhttp_request *req = rpc_state->http_req; + + if (hook_res == EVRPC_TERMINATE) goto error; /* let's check that we can parse the request */ @@ -286,8 +331,6 @@ evrpc_request_cb(struct evhttp_request *req, void *arg) if (rpc_state->request == NULL) goto error; - rpc_state->rpc = rpc; - if (rpc->request_unmarshal( rpc_state->request, req->input_buffer) == -1) { /* we failed to parse the request; that's a bummer */ @@ -300,76 +343,110 @@ evrpc_request_cb(struct evhttp_request *req, void *arg) if (rpc_state->reply == NULL) goto error; - rpc_state->http_req = req; - rpc_state->done = evrpc_request_done; - /* give the rpc to the user; they can deal with it */ rpc->cb(rpc_state, rpc->cb_arg); return; error: - evrpc_reqstate_free(rpc_state); + if (rpc_state != NULL) + evrpc_reqstate_free(rpc_state); evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error"); return; } + void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state) { + struct evrpc *rpc; + assert(rpc_state != NULL); + rpc = rpc_state->rpc; + /* clean up all memory */ - if (rpc_state != NULL) { - struct evrpc *rpc = rpc_state->rpc; - - if (rpc_state->request != NULL) - rpc->request_free(rpc_state->request); - if (rpc_state->reply != NULL) - rpc->reply_free(rpc_state->reply); - event_free(rpc_state); - } + if (rpc_state->request != NULL) + rpc->request_free(rpc_state->request); + if (rpc_state->reply != NULL) + rpc->reply_free(rpc_state->reply); + if (rpc_state->rpc_data != NULL) + evbuffer_free(rpc_state->rpc_data); + event_free(rpc_state); } +static void +evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); + void -evrpc_request_done(struct evrpc_req_generic* rpc_state) +evrpc_request_done(struct evrpc_req_generic *rpc_state) { struct evhttp_request *req = rpc_state->http_req; struct evrpc *rpc = rpc_state->rpc; - struct evbuffer* data = NULL; + int hook_res; if (rpc->reply_complete(rpc_state->reply) == -1) { /* the reply was not completely filled in. error out */ goto error; } - if ((data = evbuffer_new()) == NULL) { + if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { /* out of memory */ goto error; } /* serialize the reply */ - rpc->reply_marshal(data, rpc_state->reply); + rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); /* do hook based tweaks to the request */ - if (evrpc_process_hooks(&rpc->base->output_hooks, - req, data) == -1) + hook_res = evrpc_process_hooks(&rpc->base->output_hooks, + rpc_state, req, rpc_state->rpc_data); + switch (hook_res) { + case EVRPC_TERMINATE: goto error; + case EVRPC_PAUSE: + if (evrpc_pause_request(rpc->base, rpc_state, + evrpc_request_done_closure) == -1) + goto error; + return; + case EVRPC_CONTINUE: + break; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + } - evhttp_send_reply(req, HTTP_OK, "OK", data); + evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); + return; + +error: + if (rpc_state != NULL) + evrpc_reqstate_free(rpc_state); + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error"); + return; +} + +static void +evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_req_generic *rpc_state = arg; + struct evhttp_request *req = rpc_state->http_req; + + if (hook_res == EVRPC_TERMINATE) + goto error; - evbuffer_free(data); + evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); evrpc_reqstate_free(rpc_state); return; error: - if (data != NULL) - evbuffer_free(data); - evrpc_reqstate_free(rpc_state); + if (rpc_state != NULL) + evrpc_reqstate_free(rpc_state); evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error"); return; } + /* Client implementation of RPC site */ static int evrpc_schedule_request(struct evhttp_connection *connection, @@ -385,6 +462,8 @@ evrpc_pool_new(struct event_base *base) TAILQ_INIT(&pool->connections); TAILQ_INIT(&pool->requests); + TAILQ_INIT(&pool->paused_requests); + TAILQ_INIT(&pool->input_hooks); TAILQ_INIT(&pool->output_hooks); @@ -406,14 +485,19 @@ evrpc_pool_free(struct evrpc_pool *pool) { struct evhttp_connection *connection; struct evrpc_request_wrapper *request; + struct evrpc_hook_ctx *pause; struct evrpc_hook *hook; while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { TAILQ_REMOVE(&pool->requests, request, next); - /* if this gets more complicated we need our own function */ evrpc_request_wrapper_free(request); } + while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { + TAILQ_REMOVE(&pool->paused_requests, pause, next); + event_free(pause); + } + while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { TAILQ_REMOVE(&pool->connections, connection, next); evhttp_connection_free(connection); @@ -497,6 +581,12 @@ evrpc_pool_find_connection(struct evrpc_pool *pool) return (NULL); } +/* + * Prototypes responsible for evrpc scheduling and hooking + */ + +static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); + /* * We assume that the ctx is no longer queued on the pool. */ @@ -507,8 +597,7 @@ evrpc_schedule_request(struct evhttp_connection *connection, struct evhttp_request *req = NULL; struct evrpc_pool *pool = ctx->pool; struct evrpc_status status; - char *uri = NULL; - int res = 0; + int hook_res = 0; if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) goto error; @@ -516,16 +605,60 @@ evrpc_schedule_request(struct evhttp_connection *connection, /* serialize the request data into the output buffer */ ctx->request_marshal(req->output_buffer, ctx->request); - uri = evrpc_construct_uri(ctx->name); - if (uri == NULL) - goto error; - /* we need to know the connection that we might have to abort */ ctx->evcon = connection; + /* if we get paused we also need to know the request */ + ctx->req = req; + /* apply hooks to the outgoing request */ - if (evrpc_process_hooks(&pool->output_hooks, - req, req->output_buffer) == -1) + hook_res = evrpc_process_hooks(&pool->output_hooks, + ctx, req, req->output_buffer); + + switch (hook_res) { + case EVRPC_TERMINATE: + goto error; + case EVRPC_PAUSE: + /* we need to be explicitly resumed */ + if (evrpc_pause_request(pool, ctx, + evrpc_schedule_request_closure) == -1) + goto error; + return (0); + case EVRPC_CONTINUE: + /* we can just continue */ + break; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + } + + evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); + return (0); + +error: + memset(&status, 0, sizeof(status)); + status.error = EVRPC_STATUS_ERR_UNSTARTED; + (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); + evrpc_request_wrapper_free(ctx); + return (-1); +} + +static void +evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evhttp_connection *connection = ctx->evcon; + struct evhttp_request *req = ctx->req; + struct evrpc_pool *pool = ctx->pool; + struct evrpc_status status; + char *uri = NULL; + int res = 0; + + if (hook_res == EVRPC_TERMINATE) + goto error; + + uri = evrpc_construct_uri(ctx->name); + if (uri == NULL) goto error; if (pool->timeout > 0) { @@ -545,14 +678,50 @@ evrpc_schedule_request(struct evhttp_connection *connection, if (res == -1) goto error; - return (0); + return; error: memset(&status, 0, sizeof(status)); status.error = EVRPC_STATUS_ERR_UNSTARTED; (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); evrpc_request_wrapper_free(ctx); - return (-1); +} + +/* we just queue the paused request on the pool under the req object */ +static int +evrpc_pause_request(void *vbase, void *ctx, + void (*cb)(void *, enum EVRPC_HOOK_RESULT)) +{ + struct _evrpc_hooks *base = vbase; + struct evrpc_hook_ctx *pause = event_malloc(sizeof(*pause)); + if (pause == NULL) + return (-1); + + pause->ctx = ctx; + pause->cb = cb; + + TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); + return (0); +} + +int +evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) +{ + struct _evrpc_hooks *base = vbase; + struct evrpc_pause_list *head = &base->pause_requests; + struct evrpc_hook_ctx *pause; + + TAILQ_FOREACH(pause, head, next) { + if (pause->ctx == ctx) + break; + } + + if (pause == NULL) + return (-1); + + (*pause->cb)(pause->ctx, res); + TAILQ_REMOVE(head, pause, next); + return (0); } int @@ -579,36 +748,67 @@ evrpc_make_request(struct evrpc_request_wrapper *ctx) return (0); } +static void +evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); + static void evrpc_reply_done(struct evhttp_request *req, void *arg) { struct evrpc_request_wrapper *ctx = arg; struct evrpc_pool *pool = ctx->pool; - struct evrpc_status status; - int res = -1; + int hook_res; /* cancel any timeout we might have scheduled */ event_del(&ctx->ev_timeout); + /* if we get paused we also need to know the request */ + ctx->req = req; + + /* we need to get the reply now */ + if (req == NULL) { + evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); + return; + } + + /* apply hooks to the incoming request */ + hook_res = evrpc_process_hooks(&pool->input_hooks, + ctx, req, req->input_buffer); + + switch (hook_res) { + case EVRPC_TERMINATE: + case EVRPC_CONTINUE: + evrpc_reply_done_closure(ctx, hook_res); + return; + case EVRPC_PAUSE: + evrpc_pause_request(pool, ctx, evrpc_reply_done_closure); + return; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + } +} + +static void +evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evhttp_request *req = ctx->req; + struct evrpc_pool *pool = ctx->pool; + struct evrpc_status status; + int res = -1; + memset(&status, 0, sizeof(status)); status.http_req = req; /* we need to get the reply now */ - if (req != NULL) { - /* apply hooks to the incoming request */ - if (evrpc_process_hooks(&pool->input_hooks, - req, req->input_buffer) == -1) { - status.error = EVRPC_STATUS_ERR_HOOKABORTED; - res = -1; - } else { - res = ctx->reply_unmarshal(ctx->reply, - req->input_buffer); - if (res == -1) { - status.error = EVRPC_STATUS_ERR_BADPAYLOAD; - } - } - } else { + if (req == NULL) { status.error = EVRPC_STATUS_ERR_TIMEOUT; + } else if (hook_res == EVRPC_TERMINATE) { + status.error = EVRPC_STATUS_ERR_HOOKABORTED; + } else { + res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); + if (res == -1) + status.error = EVRPC_STATUS_ERR_BADPAYLOAD; } if (res == -1) { diff --git a/evrpc.h b/evrpc.h index 45f684ac..a1dd02aa 100644 --- a/evrpc.h +++ b/evrpc.h @@ -134,6 +134,11 @@ struct evrpc_req_generic { */ struct evhttp_request* http_req; + /* + * Temporary data store for marshaled data + */ + struct evbuffer* rpc_data; + /* * callback to reply and finish answering this rpc */ @@ -157,6 +162,7 @@ EVRPC_STRUCT(rpcname) { \ struct rplystruct* reply; \ struct evrpc* rpc; \ struct evhttp_request* http_req; \ + struct evbuffer* rpc_data; \ void (*done)(struct evrpc_status *, \ struct evrpc* rpc, void *request, void *reply); \ }; \ @@ -346,6 +352,9 @@ struct evrpc_request_wrapper { /* connection on which the request is being sent */ struct evhttp_connection *evcon; + /* the actual request */ + struct evhttp_request *req; + /* event for implementing request timeouts */ struct event ev_timeout; @@ -440,9 +449,22 @@ enum EVRPC_HOOK_TYPE { OUTPUT /**< apply the function to an output hook */ }; +/** + * Return value from hook processing functions + */ + +enum EVRPC_HOOK_RESULT { + EVRPC_TERMINATE = -1, /**< indicates the rpc should be terminated */ + EVRPC_CONTINUE = 0, /**< continue processing the rpc */ + EVRPC_PAUSE = 1, /**< pause processing request until resumed */ +}; + /** adds a processing hook to either an rpc base or rpc pool * - * If a hook returns -1, the processing is aborted. + * If a hook returns TERMINATE, the processing is aborted. On CONTINUE, + * the request is immediately processed after the hook returns. If the + * hook returns PAUSE, request processing stops until evrpc_resume_request() + * has been called. * * The add functions return handles that can be used for removing hooks. * @@ -455,7 +477,7 @@ enum EVRPC_HOOK_TYPE { */ void *evrpc_add_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, - int (*cb)(struct evhttp_request *, struct evbuffer *, void *), + int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), void *cb_arg); /** removes a previously added hook @@ -470,6 +492,14 @@ int evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle); +/** resume a paused request + * + * @param vbase a pointer to either struct evrpc_base or struct evrpc_pool + * @param ctx the context pointer provided to the original hook call + */ +int +evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res); + #ifdef __cplusplus } #endif diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 9fc1d50b..cde31f88 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -409,7 +409,7 @@ done: } static int -rpc_hook_add_header(struct evhttp_request *req, +rpc_hook_add_header(void *ctx, struct evhttp_request *req, struct evbuffer *evbuf, void *arg) { const char *hook_type = arg; @@ -421,7 +421,7 @@ rpc_hook_add_header(struct evhttp_request *req, } static int -rpc_hook_remove_header(struct evhttp_request *req, +rpc_hook_remove_header(void *ctx, struct evhttp_request *req, struct evbuffer *evbuf, void *arg) { const char *header = evhttp_find_header(req->input_headers, "X-Hook"); @@ -498,6 +498,9 @@ rpc_basic_client(void) evrpc_pool_free(pool); evhttp_free(http); + + need_input_hook = 0; + need_output_hook = 0; } /* @@ -569,6 +572,99 @@ done: event_loopexit(NULL); } +/* we just pause the rpc and continue it in the next callback */ + +struct _rpc_hook_ctx { + void *vbase; + void *ctx; +}; + +static void +rpc_hook_pause_cb(int fd, short what, void *arg) +{ + struct _rpc_hook_ctx *ctx = arg; + evrpc_resume_request(ctx->vbase, ctx->ctx, EVRPC_CONTINUE); +} + +static int +rpc_hook_pause(void *ctx, struct evhttp_request *req, struct evbuffer *evbuf, + void *arg) +{ + struct _rpc_hook_ctx *tmp = malloc(sizeof(*tmp)); + struct timeval tv; + + assert(tmp != NULL); + tmp->vbase = arg; + tmp->ctx = ctx; + + memset(&tv, 0, sizeof(tv)); + event_once(-1, EV_TIMEOUT, rpc_hook_pause_cb, tmp, &tv); + return EVRPC_PAUSE; +} + +static void +rpc_basic_client_with_pause(void) +{ + short port; + struct evhttp *http = NULL; + struct evrpc_base *base = NULL; + struct evrpc_pool *pool = NULL; + struct msg *msg; + struct kill *kill; + + fprintf(stdout, "Testing RPC Client with pause hooks: "); + + rpc_setup(&http, &port, &base); + + assert(evrpc_add_hook(base, INPUT, rpc_hook_pause, base)); + assert(evrpc_add_hook(base, OUTPUT, rpc_hook_pause, base)); + + pool = rpc_pool_with_connection(port); + + assert(evrpc_add_hook(pool, INPUT, rpc_hook_pause, pool)); + assert(evrpc_add_hook(pool, OUTPUT, rpc_hook_pause, pool)); + + /* set up the basic message */ + msg = msg_new(); + EVTAG_ASSIGN(msg, from_name, "niels"); + EVTAG_ASSIGN(msg, to_name, "tester"); + + kill = kill_new(); + + EVRPC_MAKE_REQUEST(Message, pool, msg, kill, GotKillCb, NULL); + + test_ok = 0; + + event_dispatch(); + + if (test_ok != 1) { + fprintf(stdout, "FAILED (1)\n"); + exit(1); + } + + /* we do it twice to make sure that reuse works correctly */ + kill_clear(kill); + + EVRPC_MAKE_REQUEST(Message, pool, msg, kill, GotKillCb, NULL); + + event_dispatch(); + + rpc_teardown(base); + + if (test_ok != 2) { + fprintf(stdout, "FAILED (2)\n"); + exit(1); + } + + fprintf(stdout, "OK\n"); + + msg_free(msg); + kill_free(kill); + + evrpc_pool_free(pool); + evhttp_free(http); +} + static void rpc_client_timeout(void) { @@ -627,5 +723,6 @@ rpc_suite(void) rpc_basic_message(); rpc_basic_client(); rpc_basic_queued_client(); + rpc_basic_client_with_pause(); rpc_client_timeout(); } -- 2.40.0