]> granicus.if.org Git - apache/blob - modules/http2/h2_mplx.c
avoiding race where h2_stream is closed/reset by client while being unprocessed in...
[apache] / modules / http2 / h2_mplx.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <stddef.h>
18
19 #include <apr_atomic.h>
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
22 #include <apr_strings.h>
23 #include <apr_time.h>
24
25 #include <httpd.h>
26 #include <http_core.h>
27 #include <http_log.h>
28
29 #include "h2_private.h"
30 #include "h2_config.h"
31 #include "h2_conn.h"
32 #include "h2_io.h"
33 #include "h2_io_set.h"
34 #include "h2_response.h"
35 #include "h2_mplx.h"
36 #include "h2_request.h"
37 #include "h2_stream.h"
38 #include "h2_stream_set.h"
39 #include "h2_task.h"
40 #include "h2_task_input.h"
41 #include "h2_task_output.h"
42 #include "h2_task_queue.h"
43 #include "h2_workers.h"
44
45
46 static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
47     AP_DEBUG_ASSERT(m);
48     if (m->aborted) {
49         *pstatus = APR_ECONNABORTED;
50         return 1;
51     }
52     return 0;
53 }
54
55 static void have_out_data_for(h2_mplx *m, int stream_id);
56
57 static void h2_mplx_destroy(h2_mplx *m)
58 {
59     AP_DEBUG_ASSERT(m);
60     m->aborted = 1;
61     if (m->q) {
62         h2_tq_destroy(m->q);
63         m->q = NULL;
64     }
65     if (m->ready_ios) {
66         h2_io_set_destroy(m->ready_ios);
67         m->ready_ios = NULL;
68     }
69     if (m->stream_ios) {
70         h2_io_set_destroy(m->stream_ios);
71         m->stream_ios = NULL;
72     }
73     
74     if (m->lock) {
75         apr_thread_mutex_destroy(m->lock);
76         m->lock = NULL;
77     }
78     
79     if (m->pool) {
80         apr_pool_destroy(m->pool);
81     }
82 }
83
84 /**
85  * A h2_mplx needs to be thread-safe *and* if will be called by
86  * the h2_session thread *and* the h2_worker threads. Therefore:
87  * - calls are protected by a mutex lock, m->lock
88  * - the pool needs its own allocator, since apr_allocator_t are 
89  *   not re-entrant. The separate allocator works without a 
90  *   separate lock since we already protect h2_mplx itself.
91  *   Since HTTP/2 connections can be expected to live longer than
92  *   their HTTP/1 cousins, the separate allocator seems to work better
93  *   than protecting a shared h2_session one with an own lock.
94  */
95 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
96 {
97     apr_status_t status = APR_SUCCESS;
98     h2_config *conf = h2_config_get(c);
99     apr_allocator_t *allocator = NULL;
100     h2_mplx *m;
101     AP_DEBUG_ASSERT(conf);
102     
103     status = apr_allocator_create(&allocator);
104     if (status != APR_SUCCESS) {
105         return NULL;
106     }
107
108     m = apr_pcalloc(parent, sizeof(h2_mplx));
109     if (m) {
110         m->id = c->id;
111         APR_RING_ELEM_INIT(m, link);
112         apr_atomic_set32(&m->refs, 1);
113         m->c = c;
114         apr_pool_create_ex(&m->pool, parent, NULL, allocator);
115         if (!m->pool) {
116             return NULL;
117         }
118         apr_allocator_owner_set(allocator, m->pool);
119         
120         status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
121                                          m->pool);
122         if (status != APR_SUCCESS) {
123             h2_mplx_destroy(m);
124             return NULL;
125         }
126         
127         m->bucket_alloc = apr_bucket_alloc_create(m->pool);
128         
129         m->q = h2_tq_create(m->id, m->pool);
130         m->stream_ios = h2_io_set_create(m->pool);
131         m->ready_ios = h2_io_set_create(m->pool);
132         m->closed = h2_stream_set_create(m->pool);
133         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
134         m->workers = workers;
135         
136         m->file_handles_allowed = h2_config_geti(conf, H2_CONF_SESSION_FILES);
137     }
138     return m;
139 }
140
141 static void reference(h2_mplx *m)
142 {
143     apr_atomic_inc32(&m->refs);
144 }
145
146 static void release(h2_mplx *m)
147 {
148     if (!apr_atomic_dec32(&m->refs)) {
149         if (m->join_wait) {
150             apr_thread_cond_signal(m->join_wait);
151         }
152     }
153 }
154
155 void h2_mplx_reference(h2_mplx *m)
156 {
157     reference(m);
158 }
159 void h2_mplx_release(h2_mplx *m)
160 {
161     release(m);
162 }
163
164 static void workers_register(h2_mplx *m) {
165     /* Initially, there was ref count increase for this as well, but
166      * this is not needed, even harmful.
167      * h2_workers is only a hub for all the h2_worker instances.
168      * At the end-of-life of this h2_mplx, we always unregister at
169      * the workers. The thing to manage are all the h2_worker instances
170      * out there. Those may hold a reference to this h2_mplx and we cannot
171      * call them to unregister.
172      * 
173      * Therefore: ref counting for h2_workers in not needed, ref counting
174      * for h2_worker using this is critical.
175      */
176     h2_workers_register(m->workers, m);
177 }
178
179 static void workers_unregister(h2_mplx *m) {
180     h2_workers_unregister(m->workers, m);
181 }
182
183 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
184 {
185     apr_status_t status;
186     workers_unregister(m);
187
188     status = apr_thread_mutex_lock(m->lock);
189     if (APR_SUCCESS == status) {
190         int attempts = 0;
191         
192         release(m);
193         while (apr_atomic_read32(&m->refs) > 0) {
194             m->join_wait = wait;
195             ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG), 
196                           0, m->c,
197                           "h2_mplx(%ld): release_join, refs=%d, waiting...", 
198                           m->id, m->refs);
199             apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
200             if (++attempts >= 6) {
201                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
202                               APLOGNO(02952) 
203                               "h2_mplx(%ld): join attempts exhausted, refs=%d", 
204                               m->id, m->refs);
205                 break;
206             }
207         }
208         if (m->join_wait) {
209             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
210                           "h2_mplx(%ld): release_join -> destroy", m->id);
211         }
212         m->join_wait = NULL;
213         apr_thread_mutex_unlock(m->lock);
214         h2_mplx_destroy(m);
215     }
216     return status;
217 }
218
219 void h2_mplx_abort(h2_mplx *m)
220 {
221     apr_status_t status;
222     AP_DEBUG_ASSERT(m);
223     status = apr_thread_mutex_lock(m->lock);
224     if (APR_SUCCESS == status) {
225         m->aborted = 1;
226         h2_io_set_destroy_all(m->stream_ios);
227         apr_thread_mutex_unlock(m->lock);
228     }
229     workers_unregister(m);
230 }
231
232
233 h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
234 {
235     h2_stream *stream = NULL;
236     apr_status_t status; 
237     h2_io *io;
238
239     if (m->aborted) {
240         return NULL;
241     }
242     status = apr_thread_mutex_lock(m->lock);
243     if (APR_SUCCESS == status) {
244         apr_pool_t *stream_pool = m->spare_pool;
245         
246         if (!stream_pool) {
247             apr_pool_create(&stream_pool, m->pool);
248         }
249         else {
250             m->spare_pool = NULL;
251         }
252         
253         stream = h2_stream_create(stream_id, stream_pool, m);
254         stream->state = H2_STREAM_ST_OPEN;
255         
256         io = h2_io_set_get(m->stream_ios, stream_id);
257         if (!io) {
258             io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
259             h2_io_set_add(m->stream_ios, io);
260         }
261         status = io? APR_SUCCESS : APR_ENOMEM;
262         apr_thread_mutex_unlock(m->lock);
263     }
264     return stream;
265 }
266
267 static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
268 {
269     apr_pool_t *pool = h2_stream_detach_pool(stream);
270     if (pool) {
271         apr_pool_clear(pool);
272         if (m->spare_pool) {
273             apr_pool_destroy(m->spare_pool);
274         }
275         m->spare_pool = pool;
276     }
277     h2_stream_destroy(stream);
278     if (io) {
279         /* The pool is cleared/destroyed which also closes all
280          * allocated file handles. Give this count back to our
281          * file handle pool. */
282         m->file_handles_allowed += io->files_handles_owned;
283         h2_io_set_remove(m->stream_ios, io);
284         h2_io_set_remove(m->ready_ios, io);
285         h2_io_destroy(io);
286     }
287 }
288
289 apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
290 {
291     apr_status_t status;
292     AP_DEBUG_ASSERT(m);
293     status = apr_thread_mutex_lock(m->lock);
294     if (APR_SUCCESS == status) {
295         h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
296         if (!io || io->task_done) {
297             /* No more io or task already done -> cleanup immediately */
298             stream_destroy(m, stream, io);
299         }
300         else {
301             /* Add stream to closed set for cleanup when task is done */
302             h2_stream_set_add(m->closed, stream);
303         }
304         apr_thread_mutex_unlock(m->lock);
305     }
306     return status;
307 }
308
309 void h2_mplx_task_done(h2_mplx *m, int stream_id)
310 {
311     apr_status_t status = apr_thread_mutex_lock(m->lock);
312     if (APR_SUCCESS == status) {
313         h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
314         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
315         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
316                       "h2_mplx(%ld): task(%d) done", m->id, stream_id);
317         if (stream) {
318             /* stream was already closed by main connection and is in 
319              * zombie state. Now that the task is done with it, we
320              * can free its resources. */
321             h2_stream_set_remove(m->closed, stream);
322             stream_destroy(m, stream, io);
323         }
324         else if (io) {
325             /* main connection has not finished stream. Mark task as done
326              * so that eventual cleanup can start immediately. */
327             io->task_done = 1;
328         }
329         apr_thread_mutex_unlock(m->lock);
330     }
331 }
332
333 apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
334                              int stream_id, apr_bucket_brigade *bb,
335                              struct apr_thread_cond_t *iowait)
336 {
337     apr_status_t status; 
338     AP_DEBUG_ASSERT(m);
339     if (m->aborted) {
340         return APR_ECONNABORTED;
341     }
342     status = apr_thread_mutex_lock(m->lock);
343     if (APR_SUCCESS == status) {
344         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
345         if (io) {
346             io->input_arrived = iowait;
347             status = h2_io_in_read(io, bb, 0);
348             while (status == APR_EAGAIN 
349                    && !is_aborted(m, &status)
350                    && block == APR_BLOCK_READ) {
351                 apr_thread_cond_wait(io->input_arrived, m->lock);
352                 status = h2_io_in_read(io, bb, 0);
353             }
354             io->input_arrived = NULL;
355         }
356         else {
357             status = APR_EOF;
358         }
359         apr_thread_mutex_unlock(m->lock);
360     }
361     return status;
362 }
363
364 apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
365                               apr_bucket_brigade *bb)
366 {
367     apr_status_t status;
368     AP_DEBUG_ASSERT(m);
369     if (m->aborted) {
370         return APR_ECONNABORTED;
371     }
372     status = apr_thread_mutex_lock(m->lock);
373     if (APR_SUCCESS == status) {
374         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
375         if (io) {
376             status = h2_io_in_write(io, bb);
377             if (io->input_arrived) {
378                 apr_thread_cond_signal(io->input_arrived);
379             }
380         }
381         else {
382             status = APR_EOF;
383         }
384         apr_thread_mutex_unlock(m->lock);
385     }
386     return status;
387 }
388
389 apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
390 {
391     apr_status_t status;
392     AP_DEBUG_ASSERT(m);
393     if (m->aborted) {
394         return APR_ECONNABORTED;
395     }
396     status = apr_thread_mutex_lock(m->lock);
397     if (APR_SUCCESS == status) {
398         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
399         if (io) {
400             status = h2_io_in_close(io);
401             if (io->input_arrived) {
402                 apr_thread_cond_signal(io->input_arrived);
403             }
404         }
405         else {
406             status = APR_ECONNABORTED;
407         }
408         apr_thread_mutex_unlock(m->lock);
409     }
410     return status;
411 }
412
413 typedef struct {
414     h2_mplx_consumed_cb *cb;
415     void *cb_ctx;
416     int streams_updated;
417 } update_ctx;
418
419 static int update_window(void *ctx, h2_io *io)
420 {
421     if (io->input_consumed) {
422         update_ctx *uctx = (update_ctx*)ctx;
423         uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
424         io->input_consumed = 0;
425         ++uctx->streams_updated;
426     }
427     return 1;
428 }
429
430 apr_status_t h2_mplx_in_update_windows(h2_mplx *m, 
431                                        h2_mplx_consumed_cb *cb, void *cb_ctx)
432 {
433     apr_status_t status;
434     AP_DEBUG_ASSERT(m);
435     if (m->aborted) {
436         return APR_ECONNABORTED;
437     }
438     status = apr_thread_mutex_lock(m->lock);
439     if (APR_SUCCESS == status) {
440         update_ctx ctx;
441         
442         ctx.cb              = cb;
443         ctx.cb_ctx          = cb_ctx;
444         ctx.streams_updated = 0;
445
446         status = APR_EAGAIN;
447         h2_io_set_iter(m->stream_ios, update_window, &ctx);
448         
449         if (ctx.streams_updated) {
450             status = APR_SUCCESS;
451         }
452         apr_thread_mutex_unlock(m->lock);
453     }
454     return status;
455 }
456
457 apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, 
458                                h2_io_data_cb *cb, void *ctx, 
459                                apr_size_t *plen, int *peos)
460 {
461     apr_status_t status;
462     AP_DEBUG_ASSERT(m);
463     if (m->aborted) {
464         return APR_ECONNABORTED;
465     }
466     status = apr_thread_mutex_lock(m->lock);
467     if (APR_SUCCESS == status) {
468         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
469         if (io) {
470             status = h2_io_out_readx(io, cb, ctx, plen, peos);
471             if (status == APR_SUCCESS && io->output_drained) {
472                 apr_thread_cond_signal(io->output_drained);
473             }
474         }
475         else {
476             status = APR_ECONNABORTED;
477         }
478         apr_thread_mutex_unlock(m->lock);
479     }
480     return status;
481 }
482
483 h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
484 {
485     apr_status_t status;
486     h2_stream *stream = NULL;
487     AP_DEBUG_ASSERT(m);
488     if (m->aborted) {
489         return NULL;
490     }
491     status = apr_thread_mutex_lock(m->lock);
492     if (APR_SUCCESS == status) {
493         h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
494         if (io) {
495             h2_response *response = io->response;
496             
497             AP_DEBUG_ASSERT(response);
498             h2_io_set_remove(m->ready_ios, io);
499             
500             stream = h2_stream_set_get(streams, response->stream_id);
501             if (stream) {
502                 h2_stream_set_response(stream, response, io->bbout);
503                 if (io->output_drained) {
504                     apr_thread_cond_signal(io->output_drained);
505                 }
506             }
507             else {
508                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
509                               APLOGNO(02953) "h2_mplx(%ld): stream for response %d",
510                               m->id, response->stream_id);
511             }
512         }
513         apr_thread_mutex_unlock(m->lock);
514     }
515     return stream;
516 }
517
518 static apr_status_t out_write(h2_mplx *m, h2_io *io, 
519                               ap_filter_t* f, apr_bucket_brigade *bb,
520                               struct apr_thread_cond_t *iowait)
521 {
522     apr_status_t status = APR_SUCCESS;
523     /* We check the memory footprint queued for this stream_id
524      * and block if it exceeds our configured limit.
525      * We will not split buckets to enforce the limit to the last
526      * byte. After all, the bucket is already in memory.
527      */
528     while (!APR_BRIGADE_EMPTY(bb) 
529            && (status == APR_SUCCESS)
530            && !is_aborted(m, &status)) {
531         
532         status = h2_io_out_write(io, bb, m->stream_max_mem, 
533                                  &m->file_handles_allowed);
534         
535         /* Wait for data to drain until there is room again */
536         while (!APR_BRIGADE_EMPTY(bb) 
537                && iowait
538                && status == APR_SUCCESS
539                && (m->stream_max_mem <= h2_io_out_length(io))
540                && !is_aborted(m, &status)) {
541             io->output_drained = iowait;
542             if (f) {
543                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
544                               "h2_mplx(%ld-%d): waiting for out drain", 
545                               m->id, io->id);
546             }
547             apr_thread_cond_wait(io->output_drained, m->lock);
548             io->output_drained = NULL;
549         }
550     }
551     apr_brigade_cleanup(bb);
552     return status;
553 }
554
555 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
556                              ap_filter_t* f, apr_bucket_brigade *bb,
557                              struct apr_thread_cond_t *iowait)
558 {
559     apr_status_t status = APR_SUCCESS;
560     
561     h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
562     if (io) {
563         if (f) {
564             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
565                           "h2_mplx(%ld-%d): open response: %s",
566                           m->id, stream_id, response->status);
567         }
568         
569         io->response = h2_response_copy(io->pool, response);
570         AP_DEBUG_ASSERT(io->response);
571         h2_io_set_add(m->ready_ios, io);
572         if (bb) {
573             status = out_write(m, io, f, bb, iowait);
574         }
575         have_out_data_for(m, stream_id);
576     }
577     else {
578         status = APR_ECONNABORTED;
579     }
580     return status;
581 }
582
583 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
584                               ap_filter_t* f, apr_bucket_brigade *bb,
585                               struct apr_thread_cond_t *iowait)
586 {
587     apr_status_t status;
588     AP_DEBUG_ASSERT(m);
589     if (m->aborted) {
590         return APR_ECONNABORTED;
591     }
592     status = apr_thread_mutex_lock(m->lock);
593     if (APR_SUCCESS == status) {
594         status = out_open(m, stream_id, response, f, bb, iowait);
595         if (m->aborted) {
596             return APR_ECONNABORTED;
597         }
598         apr_thread_mutex_unlock(m->lock);
599     }
600     return status;
601 }
602
603
604 apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, 
605                                ap_filter_t* f, apr_bucket_brigade *bb,
606                                struct apr_thread_cond_t *iowait)
607 {
608     apr_status_t status;
609     AP_DEBUG_ASSERT(m);
610     if (m->aborted) {
611         return APR_ECONNABORTED;
612     }
613     status = apr_thread_mutex_lock(m->lock);
614     if (APR_SUCCESS == status) {
615         if (!m->aborted) {
616             h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
617             if (io) {
618                 status = out_write(m, io, f, bb, iowait);
619                 have_out_data_for(m, stream_id);
620                 if (m->aborted) {
621                     return APR_ECONNABORTED;
622                 }
623             }
624             else {
625                 status = APR_ECONNABORTED;
626             }
627         }
628         
629         if (m->lock) {
630             apr_thread_mutex_unlock(m->lock);
631         }
632     }
633     return status;
634 }
635
636 apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
637 {
638     apr_status_t status;
639     AP_DEBUG_ASSERT(m);
640     if (m->aborted) {
641         return APR_ECONNABORTED;
642     }
643     status = apr_thread_mutex_lock(m->lock);
644     if (APR_SUCCESS == status) {
645         if (!m->aborted) {
646             h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
647             if (io) {
648                 if (!io->response || !io->response->ngheader) {
649                     /* In case a close comes before a response was created,
650                      * insert an error one so that our streams can properly
651                      * reset.
652                      */
653                     h2_response *r = h2_response_create(stream_id, 
654                                                         "500", NULL, m->pool);
655                     status = out_open(m, stream_id, r, NULL, NULL, NULL);
656                 }
657                 status = h2_io_out_close(io);
658                 have_out_data_for(m, stream_id);
659                 if (m->aborted) {
660                     /* if we were the last output, the whole session might
661                      * have gone down in the meantime.
662                      */
663                     return APR_SUCCESS;
664                 }
665             }
666             else {
667                 status = APR_ECONNABORTED;
668             }
669         }
670         apr_thread_mutex_unlock(m->lock);
671     }
672     return status;
673 }
674
675 int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
676 {
677     int has_eos = 0;
678     apr_status_t status;
679     AP_DEBUG_ASSERT(m);
680     if (m->aborted) {
681         return 0;
682     }
683     status = apr_thread_mutex_lock(m->lock);
684     if (APR_SUCCESS == status) {
685         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
686         if (io) {
687             has_eos = h2_io_in_has_eos_for(io);
688         }
689         apr_thread_mutex_unlock(m->lock);
690     }
691     return has_eos;
692 }
693
694 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
695 {
696     apr_status_t status;
697     int has_data = 0;
698     AP_DEBUG_ASSERT(m);
699     if (m->aborted) {
700         return 0;
701     }
702     status = apr_thread_mutex_lock(m->lock);
703     if (APR_SUCCESS == status) {
704         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
705         if (io) {
706             has_data = h2_io_out_has_data(io);
707         }
708         apr_thread_mutex_unlock(m->lock);
709     }
710     return has_data;
711 }
712
713 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
714                                  apr_thread_cond_t *iowait)
715 {
716     apr_status_t status;
717     AP_DEBUG_ASSERT(m);
718     if (m->aborted) {
719         return APR_ECONNABORTED;
720     }
721     status = apr_thread_mutex_lock(m->lock);
722     if (APR_SUCCESS == status) {
723         m->added_output = iowait;
724         status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
725         if (APLOGctrace2(m->c)) {
726             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
727                           "h2_mplx(%ld): trywait on data for %f ms)",
728                           m->id, timeout/1000.0);
729         }
730         m->added_output = NULL;
731         apr_thread_mutex_unlock(m->lock);
732     }
733     return status;
734 }
735
736 static void have_out_data_for(h2_mplx *m, int stream_id)
737 {
738     (void)stream_id;
739     AP_DEBUG_ASSERT(m);
740     if (m->added_output) {
741         apr_thread_cond_signal(m->added_output);
742     }
743 }
744
745 apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
746 {
747     apr_status_t status;
748     AP_DEBUG_ASSERT(m);
749     if (m->aborted) {
750         return APR_ECONNABORTED;
751     }
752     status = apr_thread_mutex_lock(m->lock);
753     if (APR_SUCCESS == status) {
754         /* TODO: needs to sort queue by priority */
755         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
756                       "h2_mplx: do task(%s)", task->id);
757         h2_tq_append(m->q, task);
758         apr_thread_mutex_unlock(m->lock);
759     }
760     workers_register(m);
761     return status;
762 }
763
764 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
765 {
766     h2_task *task = NULL;
767     apr_status_t status;
768     AP_DEBUG_ASSERT(m);
769     if (m->aborted) {
770         *has_more = 0;
771         return NULL;
772     }
773     status = apr_thread_mutex_lock(m->lock);
774     if (APR_SUCCESS == status) {
775         task = h2_tq_pop_first(m->q);
776         if (task) {
777             h2_task_set_started(task);
778         }
779         *has_more = !h2_tq_empty(m->q);
780         apr_thread_mutex_unlock(m->lock);
781     }
782     return task;
783 }
784
785 apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream)
786 {
787     apr_status_t status;
788     AP_DEBUG_ASSERT(m);
789     if (m->aborted) {
790         return APR_ECONNABORTED;
791     }
792     status = apr_thread_mutex_lock(m->lock);
793     if (APR_SUCCESS == status) {
794         conn_rec *c = h2_conn_create(m->c, stream->pool);
795         stream->task = h2_task_create(m->id, stream->id, 
796                                       stream->pool, m, c);
797         
798         apr_thread_mutex_unlock(m->lock);
799     }
800     return status;
801 }
802