]> granicus.if.org Git - apache/commitdiff
fixed bug in upload that triggered window_updates during session shutdown, disentangl...
authorStefan Eissing <icing@apache.org>
Wed, 23 Dec 2015 13:04:40 +0000 (13:04 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 23 Dec 2015 13:04:40 +0000 (13:04 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1721540 13f79535-47bb-0310-9956-ffa450edef68

18 files changed:
modules/http2/h2_conn.c
modules/http2/h2_conn.h
modules/http2/h2_h2.c
modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_request.h
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_task.c
modules/http2/h2_task.h
modules/http2/h2_task_input.c
modules/http2/h2_version.h
modules/http2/h2_worker.c
modules/http2/h2_worker.h
modules/http2/h2_workers.c
modules/http2/h2_workers.h

index f7048602143c7878671416b34a765dc0f258db1f..262748fc1b671e76f7caa2382343bf009f572929 100644 (file)
@@ -204,58 +204,53 @@ apr_status_t h2_conn_run(struct h2_ctx *ctx, conn_rec *c)
 
 static void fix_event_conn(conn_rec *c, conn_rec *master);
 
-conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *pool)
+conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, 
+                          apr_thread_t *thread, apr_socket_t *socket)
 {
     conn_rec *c;
     
     AP_DEBUG_ASSERT(master);
-
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, master,
+                  "h2_conn(%ld): created from master", master->id);
+    
     /* This is like the slave connection creation from 2.5-DEV. A
      * very efficient way - not sure how compatible this is, since
      * the core hooks are no longer run.
      * But maybe it's is better this way, not sure yet.
      */
-    c = (conn_rec *) apr_palloc(pool, sizeof(conn_rec));
+    c = (conn_rec *) apr_palloc(p, sizeof(conn_rec));
     if (c == NULL) {
-        ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, master
                       APLOGNO(02913) "h2_task: creating conn");
         return NULL;
     }
     
     memcpy(c, master, sizeof(conn_rec));
-    c->id = (master->id & (long)pool);
-    c->master = master;
-    c->input_filters = NULL;
-    c->output_filters = NULL;
-    c->pool = pool;        
-    return c;
-}
-
-apr_status_t h2_slave_setup(h2_task *task, apr_bucket_alloc_t *bucket_alloc,
-                            apr_thread_t *thread, apr_socket_t *socket)
-{
-    conn_rec *master = task->mplx->c;
-    
-    ap_log_perror(APLOG_MARK, APLOG_TRACE3, 0, task->pool,
-                  "h2_conn(%ld): created from master", master->id);
-    
-    /* Ok, we are just about to start processing the connection and
-     * the worker is calling us to setup all necessary resources.
-     * We can borrow some from the worker itself and some we do as
-     * sub-resources from it, so that we get a nice reuse of
-     * pools.
-     */
-    task->c->pool = task->pool;
-    task->c->current_thread = thread;
-    task->c->bucket_alloc = bucket_alloc;
+           
+    /* Replace these */
+    c->id                     = (master->id & (long)p);
+    c->master                 = master;
+    c->pool                   = p;        
+    c->current_thread         = thread;
+    c->conn_config            = ap_create_conn_config(p);
+    c->notes                  = apr_table_make(p, 5);
+    c->input_filters          = NULL;
+    c->output_filters         = NULL;
+    c->bucket_alloc           = apr_bucket_alloc_create(p);
+    c->cs                     = NULL;
+    c->data_in_input_filters  = 0;
+    c->data_in_output_filters = 0;
+    c->clogging_input_filters = 1;
+    c->log                    = NULL;
+    c->log_id                 = NULL;
     
-    task->c->conn_config = ap_create_conn_config(task->pool);
-    task->c->notes = apr_table_make(task->pool, 5);
+    /* TODO: these should be unique to this thread */
+    c->sbh                    = master->sbh;
     
-    /* In order to do this in 2.4.x, we need to add a member to conn_rec */
-    task->c->master = master;
+    /* Simulate that we had already a request on this connection. */
+    c->keepalives             = 1;
     
-    ap_set_module_config(task->c->conn_config, &core_module, socket);
+    ap_set_module_config(c->conn_config, &core_module, socket);
     
     /* This works for mpm_worker so far. Other mpm modules have 
      * different needs, unfortunately. The most interesting one 
@@ -266,17 +261,14 @@ apr_status_t h2_slave_setup(h2_task *task, apr_bucket_alloc_t *bucket_alloc,
             /* all fine */
             break;
         case H2_MPM_EVENT: 
-            fix_event_conn(task->c, master);
+            fix_event_conn(c, master);
             break;
         default:
             /* fingers crossed */
             break;
     }
     
-    /* Simulate that we had already a request on this connection. */
-    task->c->keepalives = 1;
-    
-    return APR_SUCCESS;
+    return c;
 }
 
 /* This is an internal mpm event.c struct which is disguised
index d59eb46dd956ae3c8b30d02eb1fc14367d6a296c..66fa2e58cdf2d897088ef1bdc99a455220ca6b69 100644 (file)
@@ -56,9 +56,7 @@ typedef enum {
 h2_mpm_type_t h2_conn_mpm_type(void);
 
 
-conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *stream_pool);
-
-apr_status_t h2_slave_setup(struct h2_task *task, apr_bucket_alloc_t *bucket_alloc,
-                            apr_thread_t *thread, apr_socket_t *socket);
+conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, 
+                          apr_thread_t *thread, apr_socket_t *socket);
 
 #endif /* defined(__mod_h2__h2_conn__) */
index b6a8f9f4d93f84f1f4fdff58d9a7c62fe93d9603..6217c83e6f4982aff5e597dcc1de03b3bda0f798 100644 (file)
@@ -673,7 +673,7 @@ static int h2_h2_post_read_req(request_rec *r)
             /* setup the correct output filters to process the response
              * on the proper mod_http2 way. */
             ap_log_rerror(APLOG_MARK, APLOG_TRACE3, 0, r, "adding task output filter");
-            if (task->serialize_headers) {
+            if (task->ser_headers) {
                 ap_add_output_filter("H1_TO_H2_RESP", task, r, r->connection);
             }
             else {
index c1d65fb9a1eb6a390d27b2d0470a3c7180924df9..2ff45b2c735429471e68b9932a1ee1d699f9a5a6 100644 (file)
 #include "h2_task.h"
 #include "h2_util.h"
 
-h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc)
+h2_io *h2_io_create(int id, apr_pool_t *pool)
 {
     h2_io *io = apr_pcalloc(pool, sizeof(*io));
     if (io) {
         io->id = id;
         io->pool = pool;
-        io->bucket_alloc = bucket_alloc;
+        io->bucket_alloc = apr_bucket_alloc_create(pool);
     }
     return io;
 }
index b89f8b87a60a33638e33e7ad04978bdefb10e7ed..22c71c21b1ab49d8fe545a6c419d4469b03710e5 100644 (file)
@@ -71,7 +71,7 @@ struct h2_io {
 /**
  * Creates a new h2_io for the given stream id. 
  */
-h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc);
+h2_io *h2_io_create(int id, apr_pool_t *pool);
 
 /**
  * Frees any resources hold by the h2_io instance. 
index bdcc2b5e4437c8ca01345cb78c89a5130c6c0f41..54c83fdf4dc986113903d75e0abb41f59ad9c7e6 100644 (file)
@@ -136,8 +136,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
             return NULL;
         }
         
-        m->bucket_alloc = apr_bucket_alloc_create(m->pool);
-        
         m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
@@ -266,6 +264,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
     workers_unregister(m);
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
+        /* disable WINDOW_UPDATE callbacks */
+        h2_mplx_set_consumed_cb(m, NULL, NULL);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterator until all h2_io have been orphaned or destroyed */
         }
@@ -901,15 +901,14 @@ static h2_io *open_io(h2_mplx *m, int stream_id)
         m->spare_pool = NULL;
     }
     
-    io = h2_io_create(stream_id, io_pool, m->bucket_alloc);
+    io = h2_io_create(stream_id, io_pool);
     h2_io_set_add(m->stream_ios, io);
     
     return io;
 }
 
 
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
-                             const h2_request *req, int eos, 
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, 
                              h2_stream_pri_cmp *cmp, void *ctx)
 {
     apr_status_t status;
@@ -922,9 +921,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
     if (APR_SUCCESS == status) {
         h2_io *io = open_io(m, stream_id);
         io->request = req;
-        io->request_body = !eos;
 
-        if (eos) {
+        if (!io->request->body) {
             status = h2_io_in_close(io);
         }
         
@@ -942,9 +940,9 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
     return status;
 }
 
-h2_task *h2_mplx_pop_task(h2_mplx *m, h2_worker *w, int *has_more)
+const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
 {
-    h2_task *task = NULL;
+    const h2_request *req = NULL;
     apr_status_t status;
     
     AP_DEBUG_ASSERT(m);
@@ -955,18 +953,15 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, h2_worker *w, int *has_more)
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         int sid;
-        while (!task && (sid = h2_tq_shift(m->q)) > 0) {
-            /* Anything not already setup correctly in the task
-             * needs to be so now, as task will be executed right about 
-             * when this method returns. */
+        while (!req && (sid = h2_tq_shift(m->q)) > 0) {
             h2_io *io = h2_io_set_get(m->stream_ios, sid);
             if (io) {
-                task = h2_worker_create_task(w, m, io->request, !io->request_body);
+                req = io->request;
             }
         }
         *has_more = !h2_tq_empty(m->q);
         apr_thread_mutex_unlock(m->lock);
     }
-    return task;
+    return req;
 }
 
index 5e4831c3f6762513432748b9997a1b6014c3c90d..cc791764ffbe19417149ae595f02e09cd721ee21 100644 (file)
@@ -44,7 +44,6 @@ struct h2_stream;
 struct h2_request;
 struct h2_io_set;
 struct apr_thread_cond_t;
-struct h2_worker;
 struct h2_workers;
 struct h2_stream_set;
 struct h2_task_queue;
@@ -65,7 +64,6 @@ struct h2_mplx {
     volatile int refs;
     conn_rec *c;
     apr_pool_t *pool;
-    apr_bucket_alloc_t *bucket_alloc;
 
     unsigned int aborted : 1;
 
@@ -165,12 +163,10 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
  * @param m the multiplexer
  * @param stream_id the identifier of the stream
  * @param r the request to be processed
- * @param eos if input is complete
  * @param cmp the stream priority compare function
  * @param ctx context data for the compare function
  */
-apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
-                             const struct h2_request *r, int eos, 
+apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const struct h2_request *r, 
                              h2_stream_pri_cmp *cmp, void *ctx);
 
 /**
@@ -182,7 +178,7 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id,
  */
 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
 
-struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more);
+const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more);
 
 /**
  * Register a callback for the amount of input data consumed per stream. The
index 17e4b23b6ecf9a2ab04480dfc21a113a80f4a363..561fed8dcf9423e7dba9aed9496817e781010896 100644 (file)
@@ -43,6 +43,7 @@ struct h2_request {
     
     unsigned int chunked : 1; /* iff requst body needs to be forwarded as chunked */
     unsigned int eoh     : 1; /* iff end-of-headers has been seen and request is complete */
+    unsigned int body    : 1; /* iff this request has a body */
     unsigned int push    : 1; /* iff server push is possible for this request */
     
     const struct h2_config *config;
index 6a3d26155cd8d2cc40e47cff26910199bca05824..d867e38cd4a0065da734093385cadf10526b4366 100644 (file)
@@ -684,6 +684,7 @@ static void h2_session_destroy(h2_session *session)
                       session->id, (int)h2_stream_set_size(session->streams));
     }
     if (session->mplx) {
+        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
         h2_mplx_release_and_join(session->mplx, session->iowait);
         session->mplx = NULL;
     }
index 45d4cd1422cd1f2c4339cb0d369ef2604e3f4ef7..95c5b4a157197dbbf92f3a0538ec17e8f809c1a5 100644 (file)
@@ -298,13 +298,14 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
                                     eos, push_enabled);
     if (status == APR_SUCCESS) {
         if (!eos) {
+            stream->request->body = 1;
             stream->bbin = apr_brigade_create(stream->pool, 
                                               stream->session->c->bucket_alloc);
         }
         stream->input_remaining = stream->request->content_length;
         
         status = h2_mplx_process(stream->session->mplx, stream->id, 
-                                 stream->request, eos, cmp, ctx);
+                                 stream->request, cmp, ctx);
         stream->scheduled = 1;
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
index 93d20ade7779cd265ee6c5bafe2d6e2270d92046..fb7ccc3a18eaa846a5b527d422b97273d9f1d5d8 100644 (file)
@@ -143,9 +143,9 @@ static int h2_task_pre_conn(conn_rec* c, void *arg)
 }
 
 h2_task *h2_task_create(long session_id, const h2_request *req, 
-                        apr_pool_t *pool, h2_mplx *mplx, int eos)
+                        apr_pool_t *pool, h2_mplx *mplx)
 {
-    h2_task *task = apr_pcalloc(pool, sizeof(h2_task));
+    h2_task *task     = apr_pcalloc(pool, sizeof(h2_task));
     if (task == NULL) {
         ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool,
                       APLOGNO(02941) "h2_task(%ld-%d): create stream task", 
@@ -154,72 +154,35 @@ h2_task *h2_task_create(long session_id, const h2_request *req,
         return NULL;
     }
     
-    task->id        = apr_psprintf(pool, "%ld-%d", session_id, req->id);
-    task->stream_id = req->id;
-    task->pool      = pool;
-    task->mplx      = mplx;
-    task->c         = h2_conn_create(mplx->c, task->pool);
+    task->id          = apr_psprintf(pool, "%ld-%d", session_id, req->id);
+    task->stream_id   = req->id;
+    task->mplx        = mplx;
+    task->request     = req;
+    task->input_eos   = !req->body;
+    task->ser_headers = h2_config_geti(req->config, H2_CONF_SER_HEADERS);
 
-    task->request   = req;
-    task->input_eos = eos;    
-    
     return task;
 }
 
-apr_status_t h2_task_destroy(h2_task *task)
-{
-    (void)task;
-    return APR_SUCCESS;
-}
-
-apr_status_t h2_task_do(h2_task *task, h2_worker *worker)
+apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond, 
+                        apr_socket_t *socket)
 {
-    apr_status_t status = APR_SUCCESS;
-    
     AP_DEBUG_ASSERT(task);
+    task->io = cond;
+    task->input = h2_task_input_create(task, c->pool, c->bucket_alloc);
+    task->output = h2_task_output_create(task, c->pool);
     
-    task->serialize_headers = h2_config_geti(task->request->config, H2_CONF_SER_HEADERS);
-
-    status = h2_worker_setup_task(worker, task);
+    ap_process_connection(c, socket);
     
-    /* save in connection that this one is a pseudo connection */
-    h2_ctx_create_for(task->c, task);
-
-    if (status == APR_SUCCESS) {
-        task->input = h2_task_input_create(task, task->pool, 
-                                           task->c->bucket_alloc);
-        task->output = h2_task_output_create(task, task->pool);
-        
-        ap_process_connection(task->c, h2_worker_get_socket(worker));
-        
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
-                      "h2_task(%s): processing done", task->id);
-    }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, task->c,
-                      APLOGNO(02957) "h2_task(%s): error setting up h2_task", 
-                      task->id);
-    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+                  "h2_task(%s): processing done", task->id);
     
-    if (task->input) {
-        h2_task_input_destroy(task->input);
-        task->input = NULL;
-    }
+    h2_task_input_destroy(task->input);
+    h2_task_output_close(task->output);
+    h2_task_output_destroy(task->output);
+    task->io = NULL;
     
-    if (task->output) {
-        h2_task_output_close(task->output);
-        h2_task_output_destroy(task->output);
-        task->output = NULL;
-    }
-
-    if (task->io) {
-        apr_thread_cond_signal(task->io);
-    }
-    
-    h2_worker_release_task(worker, task);
-    h2_mplx_task_done(task->mplx, task->stream_id);
-    
-    return status;
+    return APR_SUCCESS;
 }
 
 static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c)
@@ -261,7 +224,7 @@ static int h2_task_process_conn(conn_rec* c)
     
     ctx = h2_ctx_get(c, 0);
     if (h2_ctx_is_task(ctx)) {
-        if (!ctx->task->serialize_headers) {
+        if (!ctx->task->ser_headers) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, 
                           "h2_h2, processing request directly");
             h2_task_process_request(ctx->task->request, c);
index c0aff2c82473ad7a7feaae01b3db455e1db11942..08ce6c093a8c40d67158f290b991cac54e87941d 100644 (file)
@@ -48,30 +48,23 @@ typedef struct h2_task h2_task;
 struct h2_task {
     const char *id;
     int stream_id;
-    apr_pool_t *pool;
-    apr_bucket_alloc_t *bucket_alloc;
-
     struct h2_mplx *mplx;    
-    struct conn_rec *c;
     const struct h2_request *request;
     
-    unsigned int filters_set       : 1;
-    unsigned int input_eos         : 1;
-    unsigned int serialize_headers : 1;
+    unsigned int filters_set : 1;
+    unsigned int input_eos   : 1;
+    unsigned int ser_headers : 1;
     
     struct h2_task_input *input;
     struct h2_task_output *output;
-    
     struct apr_thread_cond_t *io;   /* used to wait for events on */
 };
 
 h2_task *h2_task_create(long session_id, const struct h2_request *req, 
-                        apr_pool_t *pool, struct h2_mplx *mplx, 
-                        int eos);
-
-apr_status_t h2_task_destroy(h2_task *task);
+                        apr_pool_t *pool, struct h2_mplx *mplx);
 
-apr_status_t h2_task_do(h2_task *task, struct h2_worker *worker);
+apr_status_t h2_task_do(h2_task *task, conn_rec *c, 
+                        struct apr_thread_cond_t *cond, apr_socket_t *socket);
 
 void h2_task_register_hooks(void);
 
index 921d03394e7bcda20c2bdc489d095681301f5093..992e956982d9ee0d776c5f6b1c458ca8d98526a9 100644 (file)
@@ -51,8 +51,8 @@ h2_task_input *h2_task_input_create(h2_task *task, apr_pool_t *pool,
         input->task = task;
         input->bb = NULL;
         
-        if (task->serialize_headers) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
+        if (task->ser_headers) {
+            ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool,
                           "h2_task_input(%s): serialize request %s %s", 
                           task->id, task->request->method, task->request->path);
             input->bb = apr_brigade_create(pool, bucket_alloc);
index b85d43618daa2a12dd0520fb97c18fb6a8e2775f..2637ff2fc526b600bc7a4e104c7adb88e34ece42 100644 (file)
@@ -20,7 +20,7 @@
  * @macro
  * Version number of the h2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.0.13-DEVa"
+#define MOD_HTTP2_VERSION "1.0.14-DEVa"
 
 /**
  * @macro
@@ -28,7 +28,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x01000d
+#define MOD_HTTP2_VERSION_NUM 0x01000e
 
 
 #endif /* mod_h2_h2_version_h */
index 54f0450c4413a52a0f4fff3b160cbcb88acb3e1e..8f988192a569a2d3c1a82c4e8550c906390ba603 100644 (file)
@@ -24,6 +24,8 @@
 
 #include "h2_private.h"
 #include "h2_conn.h"
+#include "h2_ctx.h"
+#include "h2_h2.h"
 #include "h2_mplx.h"
 #include "h2_request.h"
 #include "h2_task.h"
@@ -34,6 +36,11 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
     h2_worker *worker = (h2_worker *)wctx;
     apr_status_t status = APR_SUCCESS;
     h2_mplx *m;
+    const h2_request *req;
+    h2_task *task;
+    conn_rec *c, *master;
+    int stream_id;
+    
     (void)thread;
     
     /* Furthermore, other code might want to see the socket for
@@ -50,15 +57,32 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
         return NULL;
     }
     
-    worker->task = NULL;
     m = NULL;
     while (!worker->aborted) {
-        status = worker->get_next(worker, &m, &worker->task, worker->ctx);
+        status = worker->get_next(worker, &m, &req, worker->ctx);
         
-        if (worker->task) {            
-            h2_task_do(worker->task, worker);
-            worker->task = NULL;
-            apr_thread_cond_signal(worker->io);
+        if (req) {
+            stream_id = req->id;
+            master = m->c;
+            c = h2_slave_create(master, worker->task_pool, 
+                                worker->thread, worker->socket);
+            if (!c) {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
+                              APLOGNO(02957) "h2_task(%s): error setting up slave connection", 
+                              task->id);
+                h2_mplx_out_rst(m, task->stream_id, H2_ERR_INTERNAL_ERROR);
+            }
+            else {
+                task = h2_task_create(m->id, req, worker->task_pool, m);
+                h2_ctx_create_for(c, task);
+                h2_task_do(task, c, worker->io, worker->socket);
+                
+                apr_thread_cond_signal(worker->io);
+            }
+            apr_pool_clear(worker->task_pool);
+            /* task is gone */
+            task = NULL;
+            h2_mplx_task_done(m, stream_id);
         }
     }
 
@@ -124,6 +148,7 @@ h2_worker *h2_worker_create(int id,
         }
         
         apr_pool_pre_cleanup_register(w->pool, w, cleanup_join_thread);
+        apr_pool_create(&w->task_pool, w->pool);
         apr_thread_create(&w->thread, attr, execute, w, w->pool);
     }
     return w;
@@ -158,52 +183,12 @@ int h2_worker_is_aborted(h2_worker *worker)
 }
 
 h2_task *h2_worker_create_task(h2_worker *worker, h2_mplx *m, 
-                               const h2_request *req, int eos)
+                               const h2_request *req)
 {
     h2_task *task;
     
-    /* Create a subpool from the worker one to be used for all things
-     * with life-time of this task execution.
-     */
-    if (!worker->task_pool) {
-        apr_pool_create(&worker->task_pool, worker->pool);
-        worker->pool_reuses = 100;
-    }
-    task = h2_task_create(m->id, req, worker->task_pool, m, eos);
-    
-    /* Link the task to the worker which provides useful things such
-     * as mutex, a socket etc. */
-    task->io = worker->io;
-    
+    task = h2_task_create(m->id, req, worker->task_pool, m);
     return task;
 }
 
-apr_status_t h2_worker_setup_task(h2_worker *worker, h2_task *task) {
-    apr_status_t status;
-    
-    
-    status = h2_slave_setup(task, apr_bucket_alloc_create(task->pool),
-                            worker->thread, worker->socket);
-    
-    return status;
-}
-
-void h2_worker_release_task(h2_worker *worker, struct h2_task *task)
-{
-    task->io = NULL;
-    task->pool = NULL;
-    if (worker->pool_reuses-- <= 0) {
-        apr_pool_destroy(worker->task_pool);
-        worker->task_pool = NULL;
-    }
-    else {
-        apr_pool_clear(worker->task_pool);
-    }
-}
-
-apr_socket_t *h2_worker_get_socket(h2_worker *worker)
-{
-    return worker->socket;
-}
-
 
index 6bf9bf31c0b7836709bb815db1211544b89c8abd..fc0f359eb8ab7ecb5108d20ac4b52b2ad509da1b 100644 (file)
@@ -31,7 +31,7 @@ typedef struct h2_worker h2_worker;
  * gets aborted (idle timeout, for example). */
 typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker,
                                             struct h2_mplx **pm,
-                                            struct h2_task **ptask,
+                                            const struct h2_request **preq,
                                             void *ctx);
 
 /* Invoked just before the worker thread exits. */
@@ -54,8 +54,6 @@ struct h2_worker {
     void *ctx;
     
     unsigned int aborted : 1;
-    int pool_reuses;
-    struct h2_task *task;
 };
 
 /**
@@ -145,10 +143,6 @@ int h2_worker_get_id(h2_worker *worker);
 int h2_worker_is_aborted(h2_worker *worker);
 
 struct h2_task *h2_worker_create_task(h2_worker *worker, struct h2_mplx *m, 
-                                      const struct h2_request *req, int eos);
-apr_status_t h2_worker_setup_task(h2_worker *worker, struct h2_task *task);
-void h2_worker_release_task(h2_worker *worker, struct h2_task *task);
-
-apr_socket_t *h2_worker_get_socket(h2_worker *worker);
-
+                                      const struct h2_request *req);
+                                      
 #endif /* defined(__mod_h2__h2_worker__) */
index 7aec7947491a3a81c6f286a49caaaf5dc91caee3..89aa4efdf104ab8621def45b5c9b4d6d177db6c7 100644 (file)
@@ -25,7 +25,7 @@
 
 #include "h2_private.h"
 #include "h2_mplx.h"
-#include "h2_task.h"
+#include "h2_request.h"
 #include "h2_task_queue.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
@@ -68,20 +68,20 @@ static void cleanup_zombies(h2_workers *workers, int lock)
  * the h2_workers lock.
  */
 static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, 
-                                  h2_task **ptask, void *ctx)
+                                  const h2_request **preq, void *ctx)
 {
     apr_status_t status;
     h2_mplx *m = NULL;
-    h2_task *task = NULL;
+    const h2_request *req = NULL;
     apr_time_t max_wait, start_wait;
     int has_more = 0;
     h2_workers *workers = (h2_workers *)ctx;
     
-    if (*pm && ptask != NULL) {
+    if (*pm && preq != NULL) {
         /* We have a h2_mplx instance and the worker wants the next task. 
          * Try to get one from the given mplx. */
-        *ptask = h2_mplx_pop_task(*pm, worker, &has_more);
-        if (*ptask) {
+        *preq = h2_mplx_pop_request(*pm, &has_more);
+        if (*preq) {
             return APR_SUCCESS;
         }
     }
@@ -94,7 +94,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
         *pm = NULL;
     }
     
-    if (!ptask) {
+    if (!preq) {
         /* the worker does not want a next task, we're done.
          */
         return APR_SUCCESS;
@@ -109,7 +109,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
         ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
                      "h2_worker(%d): looking for work", h2_worker_get_id(worker));
         
-        while (!task && !h2_worker_is_aborted(worker) && !workers->aborted) {
+        while (!req && !h2_worker_is_aborted(worker) && !workers->aborted) {
             
             /* Get the next h2_mplx to process that has a task to hand out.
              * If it does, place it at the end of the queu and return the
@@ -121,12 +121,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
              * we do a timed wait or block indefinitely.
              */
             m = NULL;
-            while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
+            while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
                 m = H2_MPLX_LIST_FIRST(&workers->mplxs);
                 H2_MPLX_REMOVE(m);
                 
-                task = h2_mplx_pop_task(m, worker, &has_more);
-                if (task) {
+                req = h2_mplx_pop_request(m, &has_more);
+                if (req) {
                     if (has_more) {
                         H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
                     }
@@ -137,7 +137,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
                 }
             }
             
-            if (!task) {
+            if (!req) {
                 /* Need to wait for either a new mplx to arrive.
                  */
                 cleanup_zombies(workers, 0);
@@ -174,16 +174,16 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
         /* Here, we either have gotten task and mplx for the worker or
          * needed to give up with more than enough workers.
          */
-        if (task) {
+        if (req) {
             ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
-                         "h2_worker(%d): start task(%s)",
-                         h2_worker_get_id(worker), task->id);
+                         "h2_worker(%d): start request(%ld-%d)",
+                         h2_worker_get_id(worker), m->id, req->id);
             /* Since we hand out a reference to the worker, we increase
              * its ref count.
              */
             h2_mplx_reference(m);
             *pm = m;
-            *ptask = task;
+            *preq = req;
             
             if (has_more && workers->idle_worker_count > 1) {
                 apr_thread_cond_signal(workers->mplx_added);
index f79d5cac07f7d42095b57c595a1e267114bfc467..16ec4443b7e4d54f13aa82bf8a0dfbfc16cd3a44 100644 (file)
@@ -25,6 +25,7 @@
 struct apr_thread_mutex_t;
 struct apr_thread_cond_t;
 struct h2_mplx;
+struct h2_request;
 struct h2_task;
 struct h2_task_queue;