From: Justin Erenkrantz Date: Wed, 14 Nov 2007 19:49:05 +0000 (+0000) Subject: Amsterdam sandbox: add serf input/output filters that replace the core filters. X-Git-Tag: 2.3.0~1261 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d0404fe34faab406a626000c2e6f23bc7cf8b67f;p=apache Amsterdam sandbox: add serf input/output filters that replace the core filters. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@595022 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/proxy/mod_serf.c b/modules/proxy/mod_serf.c index 7dfabb9ac0..af632cbe72 100644 --- a/modules/proxy/mod_serf.c +++ b/modules/proxy/mod_serf.c @@ -21,6 +21,8 @@ #include "http_log.h" #include "serf.h" +#include "serf_bucket_types.h" +#include "serf_bucket_util.h" #include "apr_uri.h" module AP_MODULE_DECLARE_DATA serf_module; @@ -370,6 +372,207 @@ static int drive_serf(request_rec *r, serf_config_rec *conf) return baton.rstatus; } +typedef struct { + serf_context_t *serf_ctx; + serf_bucket_alloc_t *serf_bkt_alloc; + serf_bucket_t *serf_in_bucket; + serf_bucket_t *serf_out_bucket; + apr_bucket_brigade *out_brigade; + apr_bucket_brigade *tmp_brigade; + apr_status_t serf_bucket_status; +} serf_core_ctx_t; + +typedef struct { + apr_pool_t *pool; + serf_bucket_alloc_t *allocator; + serf_core_ctx_t *core_ctx; + apr_bucket_brigade *bb; + apr_bucket_brigade *tmp_bb; +} brigade_bucket_ctx_t; + +/* Forward-declare */ +const serf_bucket_type_t serf_bucket_type_brigade; + +static serf_bucket_t * brigade_create(ap_filter_t *f, serf_core_ctx_t *core_ctx) +{ + brigade_bucket_ctx_t *ctx; + + ctx = serf_bucket_mem_alloc(core_ctx->serf_bkt_alloc, sizeof(*ctx)); + ctx->allocator = core_ctx->serf_bkt_alloc; + ctx->pool = serf_bucket_allocator_get_pool(ctx->allocator); + ctx->core_ctx = core_ctx; + ctx->bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc); + ctx->tmp_bb = apr_brigade_create(f->c->pool, f->c->bucket_alloc); + + return serf_bucket_create(&serf_bucket_type_brigade, ctx->allocator, ctx); +} + +static apr_status_t brigade_read(serf_bucket_t *bucket, + apr_size_t requested, + const char **data, apr_size_t *len) +{ + brigade_bucket_ctx_t *ctx = bucket->data; + apr_status_t status; + apr_bucket *b, *end, *f; + + b = APR_BRIGADE_FIRST(ctx->bb); + status = apr_bucket_read(b, data, len, APR_BLOCK_READ); + + if (requested < *len) { + *len = requested; + } + status = apr_brigade_partition(ctx->bb, *len, &end); + f = APR_BRIGADE_FIRST(ctx->bb); + while (f != end && f != APR_BRIGADE_SENTINEL(ctx->bb)) { + apr_bucket_delete(f); + f = APR_BRIGADE_FIRST(ctx->bb); + } + return status; +} + +static apr_status_t brigade_readline(serf_bucket_t *bucket, + int acceptable, int *found, + const char **data, apr_size_t *len) +{ + brigade_bucket_ctx_t *ctx = bucket->data; + apr_status_t status; + + status = apr_brigade_split_line(ctx->tmp_bb, ctx->bb, + APR_BLOCK_READ, HUGE_STRING_LEN); + if (APR_STATUS_IS_EAGAIN(status)) { + if (found) { + *found = SERF_NEWLINE_NONE; + } + status = APR_SUCCESS; + } + apr_brigade_pflatten(ctx->bb, data, len, ctx->pool); + return status; +} + +static apr_status_t brigade_peek(serf_bucket_t *bucket, + const char **data, + apr_size_t *len) +{ + return APR_ENOTIMPL; +} + +static void brigade_destroy(serf_bucket_t *bucket) +{ + serf_default_destroy_and_data(bucket); +} + +const serf_bucket_type_t serf_bucket_type_brigade = { + brigade_read, + brigade_readline, + serf_default_read_iovec, + serf_default_read_for_sendfile, + serf_default_read_bucket, + brigade_peek, + brigade_destroy, +}; + +static serf_core_ctx_t* init_ctx(ap_filter_t *f, apr_socket_t *socket) +{ + serf_core_ctx_t *ctx; + + ctx = apr_pcalloc(f->c->pool, sizeof(*ctx)); + + ctx->serf_ctx = serf_context_create(f->c->pool); + ctx->serf_bkt_alloc = serf_bucket_allocator_create(f->c->pool, NULL, NULL); + ctx->serf_in_bucket = serf_bucket_socket_create(socket, + ctx->serf_bkt_alloc); + ctx->serf_out_bucket = serf_bucket_aggregate_create(ctx->serf_bkt_alloc); + + ctx->out_brigade = apr_brigade_create(f->c->pool, f->c->bucket_alloc); + ctx->tmp_brigade = apr_brigade_create(f->c->pool, f->c->bucket_alloc); + + return ctx; +} + +static int serf_input_filter(ap_filter_t *f, apr_bucket_brigade *bb, + ap_input_mode_t mode, apr_read_type_e block, + apr_off_t readbytes) +{ + apr_status_t status; + core_net_rec *net = f->ctx; + serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx; + + if (mode == AP_MODE_INIT) { + return APR_SUCCESS; + } + if (!ctx) + { + ctx = init_ctx(f, net->client_socket); + } + + if (mode == AP_MODE_GETLINE) { + const char *data; + apr_size_t len; + int found; + apr_bucket *b; + + ctx->serf_bucket_status = serf_bucket_readline(ctx->serf_in_bucket, + SERF_NEWLINE_ANY, + &found, &data, &len); + b = apr_bucket_transient_create(data, len, f->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + return APR_SUCCESS; + } + if (mode == AP_MODE_READBYTES) { + const char *data; + apr_size_t len; + apr_bucket *b; + + ctx->serf_bucket_status = serf_bucket_read(ctx->serf_in_bucket, + readbytes, &data, &len); + b = apr_bucket_transient_create(data, len, f->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + return APR_SUCCESS; + } + + if (mode == AP_MODE_EATCRLF || mode == AP_MODE_EXHAUSTIVE || + mode == AP_MODE_SPECULATIVE) { + abort(); + } +} + +static apr_status_t serf_output_filter(ap_filter_t *f, + apr_bucket_brigade *new_bb) +{ + conn_rec *c = f->c; + core_net_rec *net = f->ctx; + serf_core_ctx_t *ctx = (serf_core_ctx_t*)net->in_ctx; + if (!ctx) { + ctx = init_ctx(f, net->client_socket); + } + + ap_save_brigade(f, &ctx->tmp_brigade, &new_bb, c->pool); + apr_brigade_destroy(new_bb); + APR_BRIGADE_CONCAT(ctx->out_brigade, ctx->tmp_brigade); + + return APR_SUCCESS; +} + +static ap_filter_rec_t *serf_input_filter_handle; +static ap_filter_rec_t *serf_output_filter_handle; + +static int serf_pre_connection(conn_rec *c, void *csd) +{ + core_net_rec *net = apr_palloc(c->pool, sizeof(*net)); + apr_status_t status; + + net->c = c; + net->in_ctx = NULL; + net->out_ctx = NULL; + net->client_socket = csd; + + ap_set_module_config(net->c->conn_config, &serf_module, csd); + ap_add_input_filter_handle(serf_input_filter_handle, net, NULL, net->c); + ap_add_output_filter_handle(serf_output_filter_handle, net, NULL, net->c); + + return DONE; +} + static int serf_handler(request_rec *r) { serf_config_rec *conf = ap_get_module_config(r->per_dir_config, @@ -424,7 +627,16 @@ static const command_rec serf_cmds[] = static void register_hooks(apr_pool_t *p) { + ap_hook_pre_connection(serf_pre_connection, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST); + + serf_input_filter_handle = + ap_register_input_filter("SERF_IN", serf_input_filter, NULL, + AP_FTYPE_NETWORK); + serf_output_filter_handle = + ap_register_output_filter("SERF_OUT", serf_output_filter, NULL, + AP_FTYPE_NETWORK); + } module AP_MODULE_DECLARE_DATA serf_module =