From cb0e5db86f05e4851a7b85d24abece63297bbf7b Mon Sep 17 00:00:00 2001 From: Brian Pane Date: Mon, 24 Oct 2005 03:33:14 +0000 Subject: [PATCH] Async write completion for Event MPM (backported from async-dev branch to 2.3 trunk) git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@327945 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 2 + modules/http/http_core.c | 20 ++---- modules/http/http_request.c | 51 +++++++------- server/mpm/experimental/event/event.c | 97 +++++++++++++++++++++++---- 4 files changed, 114 insertions(+), 56 deletions(-) diff --git a/CHANGES b/CHANGES index 942a8e5a9b..e5d9600916 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,8 @@ Changes with Apache 2.3.0 [Remove entries to the current 2.0 and 2.2 section below, when backported] + *) Asynchronous write completion for the Event MPM. [Brian Pane] + *) Added an End-Of-Request bucket type. The logging of a request and the freeing of its pool are now done when the EOR bucket is destroyed. This has the effect of delaying the logging until right after the last diff --git a/modules/http/http_core.c b/modules/http/http_core.c index 1a76bc941a..fad09ef190 100644 --- a/modules/http/http_core.c +++ b/modules/http/http_core.c @@ -122,26 +122,18 @@ static int ap_process_http_async_connection(conn_rec *c) /* process the request if it was read without error */ ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r); - if (r->status == HTTP_OK) - ap_process_request(r); + if (r->status == HTTP_OK) { + cs->state = CONN_STATE_HANDLER; + ap_process_async_request(r); + } if (ap_extended_status) ap_increment_counts(c->sbh, r); - if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted - || ap_graceful_stop_signalled()) { + if (cs->state != CONN_STATE_WRITE_COMPLETION) { + /* Something went wrong; close the connection */ cs->state = CONN_STATE_LINGER; } - else if (!c->data_in_input_filters) { - cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE; - } - else { - /* else we are pipelining. Stay in READ_REQUEST_LINE state - * and stay in the loop - */ - cs->state = CONN_STATE_READ_REQUEST_LINE; - } - } else { /* ap_read_request failed - client may have closed */ cs->state = CONN_STATE_LINGER; diff --git a/modules/http/http_request.c b/modules/http/http_request.c index 07f37a070a..43abefc0eb 100644 --- a/modules/http/http_request.c +++ b/modules/http/http_request.c @@ -191,47 +191,23 @@ AP_DECLARE(void) ap_die(int type, request_rec *r) ap_send_error_response(r_1st_err, recursive_error); } -static void check_pipeline_flush(conn_rec *c) +static void check_pipeline(conn_rec *c) { - apr_bucket *e; - apr_bucket_brigade *bb; - - /* ### if would be nice if we could PEEK without a brigade. that would - ### allow us to defer creation of the brigade to when we actually - ### need to send a FLUSH. */ - bb = apr_brigade_create(c->pool, c->bucket_alloc); - - /* Flush the filter contents if: - * - * 1) the connection will be closed - * 2) there isn't a request ready to be read - */ /* ### is zero correct? that means "read one line" */ if (c->keepalive != AP_CONN_CLOSE) { + apr_bucket_brigade *bb = apr_brigade_create(c->pool, c->bucket_alloc); if (ap_get_brigade(c->input_filters, bb, AP_MODE_EATCRLF, APR_NONBLOCK_READ, 0) != APR_SUCCESS) { c->data_in_input_filters = 0; /* we got APR_EOF or an error */ } else { c->data_in_input_filters = 1; - return; /* don't flush */ } } - - e = apr_bucket_flush_create(c->bucket_alloc); - - /* We just send directly to the connection based filters. At - * this point, we know that we have seen all of the data - * (request finalization sent an EOS bucket, which empties all - * of the request filters). We just want to flush the buckets - * if something hasn't been sent to the network yet. - */ - APR_BRIGADE_INSERT_HEAD(bb, e); - ap_pass_brigade(c->output_filters, bb); } -void ap_process_request(request_rec *r) +void ap_process_async_request(request_rec *r) { int access_status; apr_bucket_brigade *bb; @@ -289,11 +265,30 @@ void ap_process_request(request_rec *r) */ c->cs->state = CONN_STATE_WRITE_COMPLETION; - check_pipeline_flush(c); + check_pipeline(c); if (ap_extended_status) ap_time_process_request(c->sbh, STOP_PREQUEST); } +void ap_process_request(request_rec *r) +{ + apr_bucket_brigade *bb; + apr_bucket *b; + conn_rec *c = r->connection; + + ap_process_async_request(r); + + if (!c->data_in_input_filters) { + bb = apr_brigade_create(c->pool, c->bucket_alloc); + b = apr_bucket_flush_create(c->bucket_alloc); + APR_BRIGADE_INSERT_HEAD(bb, b); + ap_pass_brigade(c->output_filters, bb); + } + if (ap_extended_status) { + ap_time_process_request(c->sbh, STOP_PREQUEST); + } +} + static apr_table_t *rename_original_env(apr_pool_t *p, apr_table_t *t) { const apr_array_header_t *env_arr = apr_table_elts(t); diff --git a/server/mpm/experimental/event/event.c b/server/mpm/experimental/event/event.c index 72ba228b09..f508cfeb22 100644 --- a/server/mpm/experimental/event/event.c +++ b/server/mpm/experimental/event/event.c @@ -164,7 +164,7 @@ static int sick_child_detected; apr_thread_mutex_t *timeout_mutex; APR_RING_HEAD(timeout_head_t, conn_state_t); -static struct timeout_head_t timeout_head; +static struct timeout_head_t timeout_head, keepalive_timeout_head; static apr_pollset_t *event_pollset; @@ -592,6 +592,7 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock, pt->status = 1; pt->baton = cs; cs->pfd.client_data = pt; + APR_RING_ELEM_INIT(cs, timeout_list); ap_update_vhost_given_ip(c); @@ -621,8 +622,10 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock, else { c = cs->c; c->sbh = sbh; + pt = cs->pfd.client_data; } +read_request: if (cs->state == CONN_STATE_READ_REQUEST_LINE) { if (!c->aborted) { ap_run_process_connection(c); @@ -636,6 +639,49 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock, cs->state = CONN_STATE_LINGER; } } + + if (cs->state == CONN_STATE_WRITE_COMPLETION) { + /* For now, do blocking writes in this thread to transfer the + * rest of the response. TODO: Hand off this connection to a + * pollset for asynchronous write completion. + */ + ap_filter_t *output_filter = c->output_filters; + apr_status_t rv; + while (output_filter->next != NULL) { + output_filter = output_filter->next; + } + rv = output_filter->frec->filter_func.out_func(output_filter, NULL); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, + "network write failure in core output filter"); + cs->state = CONN_STATE_LINGER; + } + else if (c->data_in_output_filters) { + /* Still in WRITE_COMPLETION_STATE: + * Set a write timeout for this connection, and let the + * event thread poll for writeability. + */ + cs->expiration_time = ap_server_conf->timeout + time_now; + apr_thread_mutex_lock(timeout_mutex); + APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list); + apr_thread_mutex_unlock(timeout_mutex); + pt->status = 0; + cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR; + rc = apr_pollset_add(event_pollset, &cs->pfd); + return 1; + } + else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || + ap_graceful_stop_signalled()) { + c->cs->state = CONN_STATE_LINGER; + } + else if (c->data_in_input_filters) { + cs->state = CONN_STATE_READ_REQUEST_LINE; + goto read_request; + } + else { + cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE; + } + } if (cs->state == CONN_STATE_LINGER) { ap_lingering_close(c); @@ -658,11 +704,12 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock, */ cs->expiration_time = ap_server_conf->keep_alive_timeout + time_now; apr_thread_mutex_lock(timeout_mutex); - APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list); + APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list); apr_thread_mutex_unlock(timeout_mutex); pt->status = 0; - /* Add work to pollset. These are always read events */ + /* Add work to pollset. */ + cs->pfd.reqevents = APR_POLLIN; rc = apr_pollset_add(event_pollset, &cs->pfd); if (rc != APR_SUCCESS) { @@ -839,11 +886,12 @@ static void *listener_thread(apr_thread_t * thd, void *dummy) } APR_RING_INIT(&timeout_head, conn_state_t, timeout_list); + APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list); /* Create the main pollset */ rc = apr_pollset_create(&event_pollset, ap_threads_per_child, - tpool, APR_POLLSET_THREADSAFE); + tpool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "apr_pollset_create with Thread Safety failed. " @@ -853,19 +901,19 @@ static void *listener_thread(apr_thread_t * thd, void *dummy) } for (lr = ap_listeners; lr != NULL; lr = lr->next) { - apr_pollfd_t pfd = { 0 }; + apr_pollfd_t *pfd = apr_palloc(tpool, sizeof(*pfd)); pt = apr_pcalloc(tpool, sizeof(*pt)); - pfd.desc_type = APR_POLL_SOCKET; - pfd.desc.s = lr->sd; - pfd.reqevents = APR_POLLIN; + pfd->desc_type = APR_POLL_SOCKET; + pfd->desc.s = lr->sd; + pfd->reqevents = APR_POLLIN; pt->type = PT_ACCEPT; pt->baton = lr; - pfd.client_data = pt; + pfd->client_data = pt; - apr_socket_opt_set(pfd.desc.s, APR_SO_NONBLOCK, 1); - apr_pollset_add(event_pollset, &pfd); + apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1); + apr_pollset_add(event_pollset, pfd); } /* Unblock the signal used to wake this thread up, and set a handler for @@ -907,6 +955,8 @@ static void *listener_thread(apr_thread_t * thd, void *dummy) case CONN_STATE_CHECK_REQUEST_LINE_READABLE: cs->state = CONN_STATE_READ_REQUEST_LINE; break; + case CONN_STATE_WRITE_COMPLETION: + break; default: ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, @@ -918,6 +968,7 @@ static void *listener_thread(apr_thread_t * thd, void *dummy) apr_thread_mutex_lock(timeout_mutex); APR_RING_REMOVE(cs, timeout_list); apr_thread_mutex_unlock(timeout_mutex); + APR_RING_ELEM_INIT(cs, timeout_list); rc = push2worker(out_pfd, event_pollset); if (rc != APR_SUCCESS) { @@ -1002,9 +1053,10 @@ static void *listener_thread(apr_thread_t * thd, void *dummy) /* handle timed out sockets */ apr_thread_mutex_lock(timeout_mutex); - cs = APR_RING_FIRST(&timeout_head); + /* Step 1: keepalive timeouts */ + cs = APR_RING_FIRST(&keepalive_timeout_head); timeout_time = time_now + TIMEOUT_FUDGE_FACTOR; - while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list) + while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list) && cs->expiration_time < timeout_time && get_worker(&have_idle_worker)) { @@ -1023,8 +1075,25 @@ static void *listener_thread(apr_thread_t * thd, void *dummy) */ } have_idle_worker = 0; + cs = APR_RING_FIRST(&keepalive_timeout_head); + } + + /* Step 2: write completion timeouts */ + cs = APR_RING_FIRST(&timeout_head); + while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list) + && cs->expiration_time < timeout_time + && get_worker(&have_idle_worker)) { + + cs->state = CONN_STATE_LINGER; + APR_RING_REMOVE(cs, timeout_list); + rc = push2worker(&cs->pfd, event_pollset); + if (rc != APR_SUCCESS) { + return NULL; + } + have_idle_worker = 0; cs = APR_RING_FIRST(&timeout_head); } + apr_thread_mutex_unlock(timeout_mutex); } /* listener main loop */ @@ -2132,7 +2201,7 @@ static int worker_pre_config(apr_pool_t * pconf, apr_pool_t * plog, if (restart_num++ == 1) { is_graceful = 0; rv = apr_pollset_create(&event_pollset, 1, plog, - APR_POLLSET_THREADSAFE); + APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, "Couldn't create a Thread Safe Pollset. " -- 2.40.0