]> granicus.if.org Git - apache/blob - modules/http2/h2_mplx.c
merged latest changes in 2.4.x
[apache] / modules / http2 / h2_mplx.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <stddef.h>
18
19 #include <apr_atomic.h>
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
22 #include <apr_strings.h>
23 #include <apr_time.h>
24
25 #include <httpd.h>
26 #include <http_core.h>
27 #include <http_log.h>
28
29 #include "h2_private.h"
30 #include "h2_config.h"
31 #include "h2_conn.h"
32 #include "h2_io.h"
33 #include "h2_io_set.h"
34 #include "h2_response.h"
35 #include "h2_mplx.h"
36 #include "h2_request.h"
37 #include "h2_stream.h"
38 #include "h2_stream_set.h"
39 #include "h2_task.h"
40 #include "h2_task_input.h"
41 #include "h2_task_output.h"
42 #include "h2_task_queue.h"
43 #include "h2_workers.h"
44
45
46 static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
47     AP_DEBUG_ASSERT(m);
48     if (m->aborted) {
49         *pstatus = APR_ECONNABORTED;
50         return 1;
51     }
52     return 0;
53 }
54
55 static void have_out_data_for(h2_mplx *m, int stream_id);
56
57 static void h2_mplx_destroy(h2_mplx *m)
58 {
59     AP_DEBUG_ASSERT(m);
60     m->aborted = 1;
61     if (m->q) {
62         h2_tq_destroy(m->q);
63         m->q = NULL;
64     }
65     if (m->ready_ios) {
66         h2_io_set_destroy(m->ready_ios);
67         m->ready_ios = NULL;
68     }
69     if (m->stream_ios) {
70         h2_io_set_destroy(m->stream_ios);
71         m->stream_ios = NULL;
72     }
73     
74     if (m->lock) {
75         apr_thread_mutex_destroy(m->lock);
76         m->lock = NULL;
77     }
78     
79     if (m->pool) {
80         apr_pool_destroy(m->pool);
81     }
82 }
83
84 /**
85  * A h2_mplx needs to be thread-safe *and* if will be called by
86  * the h2_session thread *and* the h2_worker threads. Therefore:
87  * - calls are protected by a mutex lock, m->lock
88  * - the pool needs its own allocator, since apr_allocator_t are 
89  *   not re-entrant. The separate allocator works without a 
90  *   separate lock since we already protect h2_mplx itself.
91  *   Since HTTP/2 connections can be expected to live longer than
92  *   their HTTP/1 cousins, the separate allocator seems to work better
93  *   than protecting a shared h2_session one with an own lock.
94  */
95 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
96 {
97     apr_status_t status = APR_SUCCESS;
98     h2_config *conf = h2_config_get(c);
99     apr_allocator_t *allocator = NULL;
100     h2_mplx *m;
101     AP_DEBUG_ASSERT(conf);
102     
103     status = apr_allocator_create(&allocator);
104     if (status != APR_SUCCESS) {
105         return NULL;
106     }
107
108     m = apr_pcalloc(parent, sizeof(h2_mplx));
109     if (m) {
110         m->id = c->id;
111         APR_RING_ELEM_INIT(m, link);
112         apr_atomic_set32(&m->refs, 1);
113         m->c = c;
114         apr_pool_create_ex(&m->pool, parent, NULL, allocator);
115         if (!m->pool) {
116             return NULL;
117         }
118         apr_allocator_owner_set(allocator, m->pool);
119         
120         status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
121                                          m->pool);
122         if (status != APR_SUCCESS) {
123             h2_mplx_destroy(m);
124             return NULL;
125         }
126         
127         m->bucket_alloc = apr_bucket_alloc_create(m->pool);
128         
129         m->q = h2_tq_create(m->id, m->pool);
130         m->stream_ios = h2_io_set_create(m->pool);
131         m->ready_ios = h2_io_set_create(m->pool);
132         m->closed = h2_stream_set_create(m->pool);
133         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
134         m->workers = workers;
135         
136         m->file_handles_allowed = h2_config_geti(conf, H2_CONF_SESSION_FILES);
137     }
138     return m;
139 }
140
141 static void reference(h2_mplx *m)
142 {
143     apr_atomic_inc32(&m->refs);
144 }
145
146 static void release(h2_mplx *m)
147 {
148     if (!apr_atomic_dec32(&m->refs)) {
149         if (m->join_wait) {
150             apr_thread_cond_signal(m->join_wait);
151         }
152     }
153 }
154
155 void h2_mplx_reference(h2_mplx *m)
156 {
157     reference(m);
158 }
159 void h2_mplx_release(h2_mplx *m)
160 {
161     release(m);
162 }
163
164 static void workers_register(h2_mplx *m) {
165     /* Initially, there was ref count increase for this as well, but
166      * this is not needed, even harmful.
167      * h2_workers is only a hub for all the h2_worker instances.
168      * At the end-of-life of this h2_mplx, we always unregister at
169      * the workers. The thing to manage are all the h2_worker instances
170      * out there. Those may hold a reference to this h2_mplx and we cannot
171      * call them to unregister.
172      * 
173      * Therefore: ref counting for h2_workers in not needed, ref counting
174      * for h2_worker using this is critical.
175      */
176     h2_workers_register(m->workers, m);
177 }
178
179 static void workers_unregister(h2_mplx *m) {
180     h2_workers_unregister(m->workers, m);
181 }
182
183 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
184 {
185     apr_status_t status;
186     workers_unregister(m);
187
188     status = apr_thread_mutex_lock(m->lock);
189     if (APR_SUCCESS == status) {
190         int attempts = 0;
191         
192         release(m);
193         while (apr_atomic_read32(&m->refs) > 0) {
194             m->join_wait = wait;
195             ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG), 
196                           0, m->c,
197                           "h2_mplx(%ld): release_join, refs=%d, waiting...", 
198                           m->id, m->refs);
199             apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
200             if (++attempts >= 6) {
201                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
202                               APLOGNO(02952) 
203                               "h2_mplx(%ld): join attempts exhausted, refs=%d", 
204                               m->id, m->refs);
205                 break;
206             }
207         }
208         if (m->join_wait) {
209             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
210                           "h2_mplx(%ld): release_join -> destroy", m->id);
211         }
212         m->join_wait = NULL;
213         apr_thread_mutex_unlock(m->lock);
214         h2_mplx_destroy(m);
215     }
216     return status;
217 }
218
219 void h2_mplx_abort(h2_mplx *m)
220 {
221     apr_status_t status;
222     AP_DEBUG_ASSERT(m);
223     status = apr_thread_mutex_lock(m->lock);
224     if (APR_SUCCESS == status) {
225         m->aborted = 1;
226         h2_io_set_destroy_all(m->stream_ios);
227         apr_thread_mutex_unlock(m->lock);
228     }
229     workers_unregister(m);
230 }
231
232
233 h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
234 {
235     h2_stream *stream = NULL;
236     apr_status_t status; 
237     h2_io *io;
238
239     if (m->aborted) {
240         return NULL;
241     }
242     status = apr_thread_mutex_lock(m->lock);
243     if (APR_SUCCESS == status) {
244         apr_pool_t *stream_pool = m->spare_pool;
245         
246         if (!stream_pool) {
247             apr_pool_create(&stream_pool, m->pool);
248         }
249         else {
250             m->spare_pool = NULL;
251         }
252         
253         stream = h2_stream_create(stream_id, stream_pool, m);
254         stream->state = H2_STREAM_ST_OPEN;
255         
256         io = h2_io_set_get(m->stream_ios, stream_id);
257         if (!io) {
258             io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
259             h2_io_set_add(m->stream_ios, io);
260         }
261         status = io? APR_SUCCESS : APR_ENOMEM;
262         apr_thread_mutex_unlock(m->lock);
263     }
264     return stream;
265 }
266
267 static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
268 {
269     apr_pool_t *pool = h2_stream_detach_pool(stream);
270     if (pool) {
271         apr_pool_clear(pool);
272         if (m->spare_pool) {
273             apr_pool_destroy(m->spare_pool);
274         }
275         m->spare_pool = pool;
276     }
277     h2_stream_destroy(stream);
278     if (io) {
279         /* The pool is cleared/destroyed which also closes all
280          * allocated file handles. Give this count back to our
281          * file handle pool. */
282         m->file_handles_allowed += io->files_handles_owned;
283         h2_io_set_remove(m->stream_ios, io);
284         h2_io_destroy(io);
285     }
286 }
287
288 apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
289 {
290     apr_status_t status;
291     AP_DEBUG_ASSERT(m);
292     status = apr_thread_mutex_lock(m->lock);
293     if (APR_SUCCESS == status) {
294         h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
295         if (!io || io->task_done) {
296             /* No more io or task already done -> cleanup immediately */
297             stream_destroy(m, stream, io);
298         }
299         else {
300             /* Add stream to closed set for cleanup when task is done */
301             h2_stream_set_add(m->closed, stream);
302         }
303         apr_thread_mutex_unlock(m->lock);
304     }
305     return status;
306 }
307
308 void h2_mplx_task_done(h2_mplx *m, int stream_id)
309 {
310     apr_status_t status = apr_thread_mutex_lock(m->lock);
311     if (APR_SUCCESS == status) {
312         h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
313         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
314         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
315                       "h2_mplx(%ld): task(%d) done", m->id, stream_id);
316         if (stream) {
317             /* stream was already closed by main connection and is in 
318              * zombie state. Now that the task is done with it, we
319              * can free its resources. */
320             h2_stream_set_remove(m->closed, stream);
321             stream_destroy(m, stream, io);
322         }
323         else if (io) {
324             /* main connection has not finished stream. Mark task as done
325              * so that eventual cleanup can start immediately. */
326             io->task_done = 1;
327         }
328         apr_thread_mutex_unlock(m->lock);
329     }
330 }
331
332 apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
333                              int stream_id, apr_bucket_brigade *bb,
334                              struct apr_thread_cond_t *iowait)
335 {
336     apr_status_t status; 
337     AP_DEBUG_ASSERT(m);
338     if (m->aborted) {
339         return APR_ECONNABORTED;
340     }
341     status = apr_thread_mutex_lock(m->lock);
342     if (APR_SUCCESS == status) {
343         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
344         if (io) {
345             io->input_arrived = iowait;
346             status = h2_io_in_read(io, bb, 0);
347             while (status == APR_EAGAIN 
348                    && !is_aborted(m, &status)
349                    && block == APR_BLOCK_READ) {
350                 apr_thread_cond_wait(io->input_arrived, m->lock);
351                 status = h2_io_in_read(io, bb, 0);
352             }
353             io->input_arrived = NULL;
354         }
355         else {
356             status = APR_EOF;
357         }
358         apr_thread_mutex_unlock(m->lock);
359     }
360     return status;
361 }
362
363 apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
364                               apr_bucket_brigade *bb)
365 {
366     apr_status_t status;
367     AP_DEBUG_ASSERT(m);
368     if (m->aborted) {
369         return APR_ECONNABORTED;
370     }
371     status = apr_thread_mutex_lock(m->lock);
372     if (APR_SUCCESS == status) {
373         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
374         if (io) {
375             status = h2_io_in_write(io, bb);
376             if (io->input_arrived) {
377                 apr_thread_cond_signal(io->input_arrived);
378             }
379         }
380         else {
381             status = APR_EOF;
382         }
383         apr_thread_mutex_unlock(m->lock);
384     }
385     return status;
386 }
387
388 apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
389 {
390     apr_status_t status;
391     AP_DEBUG_ASSERT(m);
392     if (m->aborted) {
393         return APR_ECONNABORTED;
394     }
395     status = apr_thread_mutex_lock(m->lock);
396     if (APR_SUCCESS == status) {
397         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
398         if (io) {
399             status = h2_io_in_close(io);
400             if (io->input_arrived) {
401                 apr_thread_cond_signal(io->input_arrived);
402             }
403         }
404         else {
405             status = APR_ECONNABORTED;
406         }
407         apr_thread_mutex_unlock(m->lock);
408     }
409     return status;
410 }
411
412 typedef struct {
413     h2_mplx_consumed_cb *cb;
414     void *cb_ctx;
415     int streams_updated;
416 } update_ctx;
417
418 static int update_window(void *ctx, h2_io *io)
419 {
420     if (io->input_consumed) {
421         update_ctx *uctx = (update_ctx*)ctx;
422         uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
423         io->input_consumed = 0;
424         ++uctx->streams_updated;
425     }
426     return 1;
427 }
428
429 apr_status_t h2_mplx_in_update_windows(h2_mplx *m, 
430                                        h2_mplx_consumed_cb *cb, void *cb_ctx)
431 {
432     apr_status_t status;
433     AP_DEBUG_ASSERT(m);
434     if (m->aborted) {
435         return APR_ECONNABORTED;
436     }
437     status = apr_thread_mutex_lock(m->lock);
438     if (APR_SUCCESS == status) {
439         update_ctx ctx;
440         
441         ctx.cb              = cb;
442         ctx.cb_ctx          = cb_ctx;
443         ctx.streams_updated = 0;
444
445         status = APR_EAGAIN;
446         h2_io_set_iter(m->stream_ios, update_window, &ctx);
447         
448         if (ctx.streams_updated) {
449             status = APR_SUCCESS;
450         }
451         apr_thread_mutex_unlock(m->lock);
452     }
453     return status;
454 }
455
456 apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, 
457                                h2_io_data_cb *cb, void *ctx, 
458                                apr_size_t *plen, int *peos)
459 {
460     apr_status_t status;
461     AP_DEBUG_ASSERT(m);
462     if (m->aborted) {
463         return APR_ECONNABORTED;
464     }
465     status = apr_thread_mutex_lock(m->lock);
466     if (APR_SUCCESS == status) {
467         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
468         if (io) {
469             status = h2_io_out_readx(io, cb, ctx, plen, peos);
470             if (status == APR_SUCCESS && io->output_drained) {
471                 apr_thread_cond_signal(io->output_drained);
472             }
473         }
474         else {
475             status = APR_ECONNABORTED;
476         }
477         apr_thread_mutex_unlock(m->lock);
478     }
479     return status;
480 }
481
482 h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
483 {
484     apr_status_t status;
485     h2_stream *stream = NULL;
486     AP_DEBUG_ASSERT(m);
487     if (m->aborted) {
488         return NULL;
489     }
490     status = apr_thread_mutex_lock(m->lock);
491     if (APR_SUCCESS == status) {
492         h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
493         if (io) {
494             h2_response *response = io->response;
495             h2_io_set_remove(m->ready_ios, io);
496             
497             stream = h2_stream_set_get(streams, response->stream_id);
498             if (stream) {
499                 h2_stream_set_response(stream, response, io->bbout);
500                 if (io->output_drained) {
501                     apr_thread_cond_signal(io->output_drained);
502                 }
503             }
504             else {
505                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
506                               APLOGNO(02953) "h2_mplx(%ld): stream for response %d",
507                               m->id, response->stream_id);
508             }
509         }
510         apr_thread_mutex_unlock(m->lock);
511     }
512     return stream;
513 }
514
515 static apr_status_t out_write(h2_mplx *m, h2_io *io, 
516                               ap_filter_t* f, apr_bucket_brigade *bb,
517                               struct apr_thread_cond_t *iowait)
518 {
519     apr_status_t status = APR_SUCCESS;
520     /* We check the memory footprint queued for this stream_id
521      * and block if it exceeds our configured limit.
522      * We will not split buckets to enforce the limit to the last
523      * byte. After all, the bucket is already in memory.
524      */
525     while (!APR_BRIGADE_EMPTY(bb) 
526            && (status == APR_SUCCESS)
527            && !is_aborted(m, &status)) {
528         
529         status = h2_io_out_write(io, bb, m->stream_max_mem, 
530                                  &m->file_handles_allowed);
531         
532         /* Wait for data to drain until there is room again */
533         while (!APR_BRIGADE_EMPTY(bb) 
534                && iowait
535                && status == APR_SUCCESS
536                && (m->stream_max_mem <= h2_io_out_length(io))
537                && !is_aborted(m, &status)) {
538             io->output_drained = iowait;
539             if (f) {
540                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
541                               "h2_mplx(%ld-%d): waiting for out drain", 
542                               m->id, io->id);
543             }
544             apr_thread_cond_wait(io->output_drained, m->lock);
545             io->output_drained = NULL;
546         }
547     }
548     apr_brigade_cleanup(bb);
549     return status;
550 }
551
552 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
553                              ap_filter_t* f, apr_bucket_brigade *bb,
554                              struct apr_thread_cond_t *iowait)
555 {
556     apr_status_t status = APR_SUCCESS;
557     
558     h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
559     if (io) {
560         if (f) {
561             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
562                           "h2_mplx(%ld-%d): open response: %s",
563                           m->id, stream_id, response->status);
564         }
565         
566         io->response = h2_response_copy(io->pool, response);
567         h2_io_set_add(m->ready_ios, io);
568         if (bb) {
569             status = out_write(m, io, f, bb, iowait);
570         }
571         have_out_data_for(m, stream_id);
572     }
573     else {
574         status = APR_ECONNABORTED;
575     }
576     return status;
577 }
578
579 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
580                               ap_filter_t* f, apr_bucket_brigade *bb,
581                               struct apr_thread_cond_t *iowait)
582 {
583     apr_status_t status;
584     AP_DEBUG_ASSERT(m);
585     if (m->aborted) {
586         return APR_ECONNABORTED;
587     }
588     status = apr_thread_mutex_lock(m->lock);
589     if (APR_SUCCESS == status) {
590         status = out_open(m, stream_id, response, f, bb, iowait);
591         if (m->aborted) {
592             return APR_ECONNABORTED;
593         }
594         apr_thread_mutex_unlock(m->lock);
595     }
596     return status;
597 }
598
599
600 apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
601                                ap_filter_t* f, apr_bucket_brigade *bb,
602                                struct apr_thread_cond_t *iowait)
603 {
604     apr_status_t status;
605     AP_DEBUG_ASSERT(m);
606     if (m->aborted) {
607         return APR_ECONNABORTED;
608     }
609     status = apr_thread_mutex_lock(m->lock);
610     if (APR_SUCCESS == status) {
611         if (!m->aborted) {
612             h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
613             if (io) {
614                 status = out_write(m, io, f, bb, iowait);
615                 have_out_data_for(m, stream_id);
616                 if (m->aborted) {
617                     return APR_ECONNABORTED;
618                 }
619             }
620             else {
621                 status = APR_ECONNABORTED;
622             }
623         }
624         
625         if (m->lock) {
626             apr_thread_mutex_unlock(m->lock);
627         }
628     }
629     return status;
630 }
631
632 apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
633 {
634     apr_status_t status;
635     AP_DEBUG_ASSERT(m);
636     if (m->aborted) {
637         return APR_ECONNABORTED;
638     }
639     status = apr_thread_mutex_lock(m->lock);
640     if (APR_SUCCESS == status) {
641         if (!m->aborted) {
642             h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
643             if (io) {
644                 if (!io->response->ngheader) {
645                     /* In case a close comes before a response was created,
646                      * insert an error one so that our streams can properly
647                      * reset.
648                      */
649                     h2_response *r = h2_response_create(stream_id, 
650                                                         "500", NULL, m->pool);
651                     status = out_open(m, stream_id, r, NULL, NULL, NULL);
652                 }
653                 status = h2_io_out_close(io);
654                 have_out_data_for(m, stream_id);
655                 if (m->aborted) {
656                     /* if we were the last output, the whole session might
657                      * have gone down in the meantime.
658                      */
659                     return APR_SUCCESS;
660                 }
661             }
662             else {
663                 status = APR_ECONNABORTED;
664             }
665         }
666         apr_thread_mutex_unlock(m->lock);
667     }
668     return status;
669 }
670
671 int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
672 {
673     int has_eos = 0;
674     apr_status_t status;
675     AP_DEBUG_ASSERT(m);
676     if (m->aborted) {
677         return 0;
678     }
679     status = apr_thread_mutex_lock(m->lock);
680     if (APR_SUCCESS == status) {
681         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
682         if (io) {
683             has_eos = h2_io_in_has_eos_for(io);
684         }
685         apr_thread_mutex_unlock(m->lock);
686     }
687     return has_eos;
688 }
689
690 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
691 {
692     apr_status_t status;
693     int has_data = 0;
694     AP_DEBUG_ASSERT(m);
695     if (m->aborted) {
696         return 0;
697     }
698     status = apr_thread_mutex_lock(m->lock);
699     if (APR_SUCCESS == status) {
700         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
701         if (io) {
702             has_data = h2_io_out_has_data(io);
703         }
704         apr_thread_mutex_unlock(m->lock);
705     }
706     return has_data;
707 }
708
709 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
710                                  apr_thread_cond_t *iowait)
711 {
712     apr_status_t status;
713     AP_DEBUG_ASSERT(m);
714     if (m->aborted) {
715         return APR_ECONNABORTED;
716     }
717     status = apr_thread_mutex_lock(m->lock);
718     if (APR_SUCCESS == status) {
719         m->added_output = iowait;
720         status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
721         if (APLOGctrace2(m->c)) {
722             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
723                           "h2_mplx(%ld): trywait on data for %f ms)",
724                           m->id, timeout/1000.0);
725         }
726         m->added_output = NULL;
727         apr_thread_mutex_unlock(m->lock);
728     }
729     return status;
730 }
731
732 static void have_out_data_for(h2_mplx *m, int stream_id)
733 {
734     (void)stream_id;
735     AP_DEBUG_ASSERT(m);
736     if (m->added_output) {
737         apr_thread_cond_signal(m->added_output);
738     }
739 }
740
741 apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
742 {
743     apr_status_t status;
744     AP_DEBUG_ASSERT(m);
745     if (m->aborted) {
746         return APR_ECONNABORTED;
747     }
748     status = apr_thread_mutex_lock(m->lock);
749     if (APR_SUCCESS == status) {
750         /* TODO: needs to sort queue by priority */
751         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
752                       "h2_mplx: do task(%s)", task->id);
753         h2_tq_append(m->q, task);
754         apr_thread_mutex_unlock(m->lock);
755     }
756     workers_register(m);
757     return status;
758 }
759
760 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
761 {
762     h2_task *task = NULL;
763     apr_status_t status;
764     AP_DEBUG_ASSERT(m);
765     if (m->aborted) {
766         *has_more = 0;
767         return NULL;
768     }
769     status = apr_thread_mutex_lock(m->lock);
770     if (APR_SUCCESS == status) {
771         task = h2_tq_pop_first(m->q);
772         if (task) {
773             h2_task_set_started(task);
774         }
775         *has_more = !h2_tq_empty(m->q);
776         apr_thread_mutex_unlock(m->lock);
777     }
778     return task;
779 }
780
781 apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream)
782 {
783     apr_status_t status;
784     AP_DEBUG_ASSERT(m);
785     if (m->aborted) {
786         return APR_ECONNABORTED;
787     }
788     status = apr_thread_mutex_lock(m->lock);
789     if (APR_SUCCESS == status) {
790         conn_rec *c = h2_conn_create(m->c, stream->pool);
791         stream->task = h2_task_create(m->id, stream->id, 
792                                       stream->pool, m, c);
793         
794         apr_thread_mutex_unlock(m->lock);
795     }
796     return status;
797 }
798