From a51ec65a24fc30239e22d6d3444f9ca8cf7ac9c0 Mon Sep 17 00:00:00 2001 From: Paul Querna Date: Sat, 28 Mar 2009 13:24:18 +0000 Subject: [PATCH] Work in Progress. Convert the 100ms timed callback to a single cleanup callback that is added when the request is ready to finish. Basically works, though it has some issues with flushing and closing the connection. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@759460 13f79535-47bb-0310-9956-ffa450edef68 --- modules/proxy/mod_serf.c | 118 +++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 36 deletions(-) diff --git a/modules/proxy/mod_serf.c b/modules/proxy/mod_serf.c index 667b07f862..49e4cb5cd2 100644 --- a/modules/proxy/mod_serf.c +++ b/modules/proxy/mod_serf.c @@ -20,6 +20,7 @@ #include "http_core.h" #include "http_config.h" #include "http_protocol.h" +#include "http_request.h" #include "http_log.h" #include "serf.h" @@ -53,12 +54,41 @@ typedef struct { int done_headers; int keep_reading; request_rec *r; + apr_pool_t *serf_pool; + apr_bucket_brigade *tmpbb; serf_config_t *conf; serf_ssl_context_t *ssl_ctx; serf_bucket_alloc_t *bkt_alloc; } s_baton_t; +/** + * This works right now because all timers are invoked in the single listener + * thread in the Event MPM -- the same thread that serf callbacks are made + * from, so we don't technically need a mutex yet, but with the Simple MPM, + * invocations are made from worker threads, and we need to figure out locking + */ +static void timed_cleanup_callback(void *baton) +{ + s_baton_t *ctx = baton; + + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: timed_cleanup_callback"); + + /* Causes all serf connections to unregister from the event mpm */ + apr_pool_destroy(ctx->serf_pool); + if (ctx->rstatus) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r, + "serf: request returned: %d", ctx->rstatus); + ctx->r->status = HTTP_OK; + ap_die(ctx->rstatus, ctx->r); + } + else { + ap_finalize_request_protocol(ctx->r); + ap_process_request_after_handler(ctx->r); + return; + } +} + static void closed_connection(serf_connection_t *conn, void *closed_baton, apr_status_t why, @@ -66,12 +96,18 @@ static void closed_connection(serf_connection_t *conn, { s_baton_t *ctx = closed_baton; + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: closed_connection"); + if (why) { /* justin says that error handling isn't done yet. hah. */ /* XXXXXX: review */ ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, "Closed Connection Error"); ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR; - return; + } + + if (mpm_supprts_serf) { + ap_mpm_register_timed_callback(apr_time_from_msec(1), + timed_cleanup_callback, ctx); } } @@ -82,6 +118,8 @@ static serf_bucket_t* conn_setup(apr_socket_t *sock, serf_bucket_t *c; s_baton_t *ctx = setup_baton; + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: conn_setup "); + c = serf_bucket_socket_create(sock, ctx->bkt_alloc); if (ctx->want_ssl) { c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc); @@ -196,9 +234,12 @@ static serf_bucket_t* accept_response(serf_request_t *request, void *acceptor_baton, apr_pool_t *pool) { + s_baton_t *ctx = acceptor_baton; serf_bucket_t *c; serf_bucket_alloc_t *bkt_alloc; + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: accept_response"); + /* get the per-request bucket allocator */ bkt_alloc = serf_request_get_alloc(request); @@ -219,6 +260,8 @@ static apr_status_t handle_response(serf_request_t *request, apr_size_t len; serf_status_line sl; + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: handle_response"); + /* XXXXXXX: Create better error message. */ rv = serf_bucket_response_status(response, &sl); if (rv) { @@ -230,6 +273,11 @@ static apr_status_t handle_response(serf_request_t *request, ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR; + if (mpm_supprts_serf) { + ap_mpm_register_timed_callback(apr_time_from_msec(1), + timed_cleanup_callback, ctx); + } + return rv; } @@ -239,6 +287,8 @@ static apr_status_t handle_response(serf_request_t *request, **/ do { + apr_bucket *e; + apr_brigade_cleanup(ctx->tmpbb); rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len); if (SERF_BUCKET_READ_ERROR(rv)) { @@ -252,15 +302,32 @@ static apr_status_t handle_response(serf_request_t *request, serf_bucket_headers_do(hdrs, copy_headers_out, ctx); ctx->done_headers = 1; } - - /* XXXX: write to brigades and stuff. meh */ - ap_rwrite(data, len, ctx->r); + + + if (len > 0) { + /* TODO: make APR bucket <-> serf bucket stuff more magical. */ + e = apr_bucket_immortal_create(data, len, ctx->r->connection->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e); + } + + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: writing %"APR_SIZE_T_FMT" bytes", len); if (APR_STATUS_IS_EOF(rv)) { ctx->keep_reading = 0; + e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e); + + ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb); + + if (mpm_supprts_serf) { + ap_mpm_register_timed_callback(apr_time_from_msec(1), + timed_cleanup_callback, ctx); + } return APR_EOF; } + ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb); + /* XXXX: Should we send a flush now? */ if (APR_STATUS_IS_EAGAIN(rv)) { return APR_SUCCESS; @@ -283,6 +350,7 @@ static apr_status_t setup_request(serf_request_t *request, serf_bucket_t *hdrs_bkt; serf_bucket_t *body_bkt = NULL; + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: setup_request"); /* XXXXX: handle incoming request bodies */ *req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri, body_bkt, @@ -326,28 +394,6 @@ static apr_status_t setup_request(serf_request_t *request, return APR_SUCCESS; } -static void -timed_callback(void *baton) -{ - s_baton_t *ctx = baton; - - if (ctx->keep_reading) { - ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton); - } - else if (ctx->rstatus) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r, - "serf: request returned: %d", ctx->rstatus); - ctx->r->status = HTTP_OK; - ap_die(ctx->rstatus, ctx->r); - } - else { - ap_finalize_request_protocol(ctx->r); - ap_process_request_after_handler(ctx->r); - return; - } -} - - #ifndef apr_time_from_msec #define apr_time_from_msec(x) (x * 1000) #endif @@ -356,7 +402,7 @@ timed_callback(void *baton) static int drive_serf(request_rec *r, serf_config_t *conf) { apr_status_t rv; - apr_pool_t *pool = r->pool; + apr_pool_t *pool; apr_sockaddr_t *address; s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t)); /* XXXXX: make persistent/per-process or something.*/ @@ -366,7 +412,12 @@ static int drive_serf(request_rec *r, serf_config_t *conf) serf_server_config_t *ctx = (serf_server_config_t *)ap_get_module_config(r->server->module_config, &serf_module); - + + /* Allocate everything out of a subpool, with a shorter lifetime than + * the main request, so that we can cleanup safely and remove our events + * from the main serf context in the async mpm mode. + */ + apr_pool_create(&pool, r->pool); if (strcmp(conf->url.scheme, "cluster") == 0) { int rc; ap_serf_cluster_provider_t *cp; @@ -449,10 +500,12 @@ static int drive_serf(request_rec *r, serf_config_t *conf) baton->r = r; baton->conf = conf; + baton->serf_pool = pool; baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL); baton->ssl_ctx = NULL; baton->rstatus = OK; + baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc); baton->done_headers = 0; baton->keep_reading = 1; @@ -472,14 +525,7 @@ static int drive_serf(request_rec *r, serf_config_t *conf) baton); if (mpm_supprts_serf) { - - rv = ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton); - - if (rv != APR_SUCCESS) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "ap_mpm_register_timed_callback failed."); - return HTTP_INTERNAL_SERVER_ERROR; - } - + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, "handing off serf request to mpm"); return SUSPENDED; } else { -- 2.40.0