]> granicus.if.org Git - apache/commitdiff
Async write completion for Event MPM
authorBrian Pane <brianp@apache.org>
Mon, 24 Oct 2005 03:33:14 +0000 (03:33 +0000)
committerBrian Pane <brianp@apache.org>
Mon, 24 Oct 2005 03:33:14 +0000 (03:33 +0000)
(backported from async-dev branch to 2.3 trunk)

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@327945 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http/http_core.c
modules/http/http_request.c
server/mpm/experimental/event/event.c

diff --git a/CHANGES b/CHANGES
index 942a8e5a9b429050ff420915c68aff2f158b8549..e5d9600916711cd2139d84dd195416ed5db5c2fc 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,8 @@
 Changes with Apache 2.3.0
   [Remove entries to the current 2.0 and 2.2 section below, when backported]
 
+  *) Asynchronous write completion for the Event MPM.  [Brian Pane]
+
   *) Added an End-Of-Request bucket type.  The logging of a request and
      the freeing of its pool are now done when the EOR bucket is destroyed.
      This has the effect of delaying the logging until right after the last
index 1a76bc941aaded7bbf322c04794f05e37f448058..fad09ef1909a646086c1e8b09cf8e75cc9faaf1b 100644 (file)
@@ -122,26 +122,18 @@ static int ap_process_http_async_connection(conn_rec *c)
             /* process the request if it was read without error */
                                                        
             ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r);
-            if (r->status == HTTP_OK)
-                ap_process_request(r);
+            if (r->status == HTTP_OK) {
+                cs->state = CONN_STATE_HANDLER;
+                ap_process_async_request(r);
+            }
 
             if (ap_extended_status)
                 ap_increment_counts(c->sbh, r);
 
-            if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted 
-                    || ap_graceful_stop_signalled()) {
+            if (cs->state != CONN_STATE_WRITE_COMPLETION) {
+                /* Something went wrong; close the connection */
                 cs->state = CONN_STATE_LINGER;
             }
-            else if (!c->data_in_input_filters) {
-                cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
-            }
-            else {
-                /* else we are pipelining.  Stay in READ_REQUEST_LINE state
-                 *  and stay in the loop
-                 */
-                cs->state = CONN_STATE_READ_REQUEST_LINE;
-            }
-
         }
         else {   /* ap_read_request failed - client may have closed */
             cs->state = CONN_STATE_LINGER;
index 07f37a070aaac3afb81dc19392bcc71886dc5ca2..43abefc0ebea85795dfb95fbe0edd08ad32fdc13 100644 (file)
@@ -191,47 +191,23 @@ AP_DECLARE(void) ap_die(int type, request_rec *r)
     ap_send_error_response(r_1st_err, recursive_error);
 }
 
-static void check_pipeline_flush(conn_rec *c)
+static void check_pipeline(conn_rec *c)
 {
-    apr_bucket *e;
-    apr_bucket_brigade *bb;
-
-    /* ### if would be nice if we could PEEK without a brigade. that would
-       ### allow us to defer creation of the brigade to when we actually
-       ### need to send a FLUSH. */
-    bb = apr_brigade_create(c->pool, c->bucket_alloc);
-
-    /* Flush the filter contents if:
-     *
-     *   1) the connection will be closed
-     *   2) there isn't a request ready to be read
-     */
     /* ### is zero correct? that means "read one line" */
     if (c->keepalive != AP_CONN_CLOSE) {
+        apr_bucket_brigade *bb = apr_brigade_create(c->pool, c->bucket_alloc);
         if (ap_get_brigade(c->input_filters, bb, AP_MODE_EATCRLF, 
                        APR_NONBLOCK_READ, 0) != APR_SUCCESS) {
             c->data_in_input_filters = 0;  /* we got APR_EOF or an error */
         }
         else {
             c->data_in_input_filters = 1;
-            return;    /* don't flush */
         }
     }
-
-        e = apr_bucket_flush_create(c->bucket_alloc);
-
-        /* We just send directly to the connection based filters.  At
-         * this point, we know that we have seen all of the data
-         * (request finalization sent an EOS bucket, which empties all
-         * of the request filters). We just want to flush the buckets
-         * if something hasn't been sent to the network yet.
-         */
-        APR_BRIGADE_INSERT_HEAD(bb, e);
-        ap_pass_brigade(c->output_filters, bb);
 }
 
 
-void ap_process_request(request_rec *r)
+void ap_process_async_request(request_rec *r)
 {
     int access_status;
     apr_bucket_brigade *bb;
@@ -289,11 +265,30 @@ void ap_process_request(request_rec *r)
      */
 
     c->cs->state = CONN_STATE_WRITE_COMPLETION;
-    check_pipeline_flush(c);
+    check_pipeline(c);
     if (ap_extended_status)
         ap_time_process_request(c->sbh, STOP_PREQUEST);
 }
 
+void ap_process_request(request_rec *r)
+{
+    apr_bucket_brigade *bb;
+    apr_bucket *b;
+    conn_rec *c = r->connection;
+
+    ap_process_async_request(r);
+
+    if (!c->data_in_input_filters) {
+        bb = apr_brigade_create(c->pool, c->bucket_alloc);
+        b = apr_bucket_flush_create(c->bucket_alloc);
+        APR_BRIGADE_INSERT_HEAD(bb, b);
+        ap_pass_brigade(c->output_filters, bb);
+    }
+    if (ap_extended_status) {
+        ap_time_process_request(c->sbh, STOP_PREQUEST);
+    }
+}
+
 static apr_table_t *rename_original_env(apr_pool_t *p, apr_table_t *t)
 {
     const apr_array_header_t *env_arr = apr_table_elts(t);
index 72ba228b095f5c23f8d11c247ed08970cd512532..f508cfeb22b2bc8b9d7541822150553bb2d96975 100644 (file)
@@ -164,7 +164,7 @@ static int sick_child_detected;
 
 apr_thread_mutex_t *timeout_mutex;
 APR_RING_HEAD(timeout_head_t, conn_state_t);
-static struct timeout_head_t timeout_head;
+static struct timeout_head_t timeout_head, keepalive_timeout_head;
 
 static apr_pollset_t *event_pollset;
 
@@ -592,6 +592,7 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
         pt->status = 1;
         pt->baton = cs;
         cs->pfd.client_data = pt;
+        APR_RING_ELEM_INIT(cs, timeout_list);
 
         ap_update_vhost_given_ip(c);
 
@@ -621,8 +622,10 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
     else {
         c = cs->c;
         c->sbh = sbh;
+        pt = cs->pfd.client_data;
     }
 
+read_request:
     if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
         if (!c->aborted) {
             ap_run_process_connection(c);
@@ -636,6 +639,49 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
             cs->state = CONN_STATE_LINGER;
         }
     }
+    
+    if (cs->state == CONN_STATE_WRITE_COMPLETION) {
+        /* For now, do blocking writes in this thread to transfer the
+         * rest of the response.  TODO: Hand off this connection to a
+         * pollset for asynchronous write completion.
+         */
+        ap_filter_t *output_filter = c->output_filters;
+        apr_status_t rv;
+        while (output_filter->next != NULL) {
+            output_filter = output_filter->next;
+        }
+        rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
+        if (rv != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf,
+                     "network write failure in core output filter");
+            cs->state = CONN_STATE_LINGER;
+        }
+        else if (c->data_in_output_filters) {
+            /* Still in WRITE_COMPLETION_STATE:
+             * Set a write timeout for this connection, and let the
+             * event thread poll for writeability.
+             */
+            cs->expiration_time = ap_server_conf->timeout + time_now;
+            apr_thread_mutex_lock(timeout_mutex);
+            APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+            apr_thread_mutex_unlock(timeout_mutex);
+            pt->status = 0;
+            cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
+            rc = apr_pollset_add(event_pollset, &cs->pfd);
+            return 1;
+        }
+        else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted || 
+            ap_graceful_stop_signalled()) {
+            c->cs->state = CONN_STATE_LINGER;
+        }
+        else if (c->data_in_input_filters) {
+            cs->state = CONN_STATE_READ_REQUEST_LINE;
+            goto read_request;
+        }
+        else {
+            cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
+        }
+    }
 
     if (cs->state == CONN_STATE_LINGER) {
         ap_lingering_close(c);
@@ -658,11 +704,12 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
          */
         cs->expiration_time = ap_server_conf->keep_alive_timeout + time_now;
         apr_thread_mutex_lock(timeout_mutex);
-        APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+        APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list);
         apr_thread_mutex_unlock(timeout_mutex);
 
         pt->status = 0;
-        /* Add work to pollset. These are always read events */
+        /* Add work to pollset. */
+        cs->pfd.reqevents = APR_POLLIN;
         rc = apr_pollset_add(event_pollset, &cs->pfd);
 
         if (rc != APR_SUCCESS) {
@@ -839,11 +886,12 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
     }
 
     APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
+    APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list);
 
     /* Create the main pollset */
     rc = apr_pollset_create(&event_pollset,
                             ap_threads_per_child,
-                            tpool, APR_POLLSET_THREADSAFE);
+                            tpool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
     if (rc != APR_SUCCESS) {
         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                      "apr_pollset_create with Thread Safety failed. "
@@ -853,19 +901,19 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
     }
 
     for (lr = ap_listeners; lr != NULL; lr = lr->next) {
-        apr_pollfd_t pfd = { 0 };
+        apr_pollfd_t *pfd = apr_palloc(tpool, sizeof(*pfd));
         pt = apr_pcalloc(tpool, sizeof(*pt));
-        pfd.desc_type = APR_POLL_SOCKET;
-        pfd.desc.s = lr->sd;
-        pfd.reqevents = APR_POLLIN;
+        pfd->desc_type = APR_POLL_SOCKET;
+        pfd->desc.s = lr->sd;
+        pfd->reqevents = APR_POLLIN;
 
         pt->type = PT_ACCEPT;
         pt->baton = lr;
 
-        pfd.client_data = pt;
+        pfd->client_data = pt;
 
-        apr_socket_opt_set(pfd.desc.s, APR_SO_NONBLOCK, 1);
-        apr_pollset_add(event_pollset, &pfd);
+        apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1);
+        apr_pollset_add(event_pollset, pfd);
     }
 
     /* Unblock the signal used to wake this thread up, and set a handler for
@@ -907,6 +955,8 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
                 case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
                     cs->state = CONN_STATE_READ_REQUEST_LINE;
                     break;
+                case CONN_STATE_WRITE_COMPLETION:
+                    break;
                 default:
                     ap_log_error(APLOG_MARK, APLOG_ERR, rc,
                                  ap_server_conf,
@@ -918,6 +968,7 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
                 apr_thread_mutex_lock(timeout_mutex);
                 APR_RING_REMOVE(cs, timeout_list);
                 apr_thread_mutex_unlock(timeout_mutex);
+                APR_RING_ELEM_INIT(cs, timeout_list);
 
                 rc = push2worker(out_pfd, event_pollset);
                 if (rc != APR_SUCCESS) {
@@ -1002,9 +1053,10 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
         /* handle timed out sockets */
         apr_thread_mutex_lock(timeout_mutex);
 
-        cs = APR_RING_FIRST(&timeout_head);
+        /* Step 1: keepalive timeouts */
+        cs = APR_RING_FIRST(&keepalive_timeout_head);
         timeout_time = time_now + TIMEOUT_FUDGE_FACTOR;
-        while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
+        while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list)
                && cs->expiration_time < timeout_time
                && get_worker(&have_idle_worker)) {
 
@@ -1023,8 +1075,25 @@ static void *listener_thread(apr_thread_t * thd, void *dummy)
                  */
             }
             have_idle_worker = 0;
+            cs = APR_RING_FIRST(&keepalive_timeout_head);
+        }
+
+        /* Step 2: write completion timeouts */
+        cs = APR_RING_FIRST(&timeout_head);
+        while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
+               && cs->expiration_time < timeout_time
+               && get_worker(&have_idle_worker)) {
+
+            cs->state = CONN_STATE_LINGER;
+            APR_RING_REMOVE(cs, timeout_list);
+            rc = push2worker(&cs->pfd, event_pollset);
+            if (rc != APR_SUCCESS) {
+                return NULL;
+            }
+            have_idle_worker = 0;
             cs = APR_RING_FIRST(&timeout_head);
         }
+
         apr_thread_mutex_unlock(timeout_mutex);
 
     }     /* listener main loop */
@@ -2132,7 +2201,7 @@ static int worker_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
     if (restart_num++ == 1) {
         is_graceful = 0;
         rv = apr_pollset_create(&event_pollset, 1, plog,
-                                APR_POLLSET_THREADSAFE);
+                                APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
         if (rv != APR_SUCCESS) {
             ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
                          "Couldn't create a Thread Safe Pollset. "