]> granicus.if.org Git - apache/blob - modules/http2/h2_mplx.c
Backport
[apache] / modules / http2 / h2_mplx.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <stddef.h>
18 #include <stdlib.h>
19
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
22 #include <apr_strings.h>
23 #include <apr_time.h>
24
25 #include <httpd.h>
26 #include <http_core.h>
27 #include <http_log.h>
28
29 #include "mod_http2.h"
30
31 #include "h2_private.h"
32 #include "h2_bucket_beam.h"
33 #include "h2_config.h"
34 #include "h2_conn.h"
35 #include "h2_ctx.h"
36 #include "h2_h2.h"
37 #include "h2_response.h"
38 #include "h2_mplx.h"
39 #include "h2_ngn_shed.h"
40 #include "h2_request.h"
41 #include "h2_stream.h"
42 #include "h2_task.h"
43 #include "h2_worker.h"
44 #include "h2_workers.h"
45 #include "h2_util.h"
46
47
48 static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, 
49                         conn_rec *c, int level)
50 {
51     if (beam && APLOG_C_IS_LEVEL(c,level)) {
52         char buffer[2048];
53         apr_size_t off = 0;
54         
55         off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
56         off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
57         off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
58         off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
59         off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
60
61         ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s", 
62                       c->id, id, msg, buffer);
63     }
64 }
65
66 /* utility for iterating over ihash task sets */
67 typedef struct {
68     h2_mplx *m;
69     h2_task *task;
70     apr_time_t now;
71 } task_iter_ctx;
72
73 /* NULL or the mutex hold by this thread, used for recursive calls
74  */
75 static apr_threadkey_t *thread_lock;
76
77 apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
78 {
79     return apr_threadkey_private_create(&thread_lock, NULL, pool);
80 }
81
82 static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
83 {
84     apr_status_t status;
85     void *mutex = NULL;
86     
87     /* Enter the mutex if this thread already holds the lock or
88      * if we can acquire it. Only on the later case do we unlock
89      * onleaving the mutex.
90      * This allow recursive entering of the mutex from the saem thread,
91      * which is what we need in certain situations involving callbacks
92      */
93     AP_DEBUG_ASSERT(m);
94     apr_threadkey_private_get(&mutex, thread_lock);
95     if (mutex == m->lock) {
96         *pacquired = 0;
97         return APR_SUCCESS;
98     }
99
100     AP_DEBUG_ASSERT(m->lock);
101     status = apr_thread_mutex_lock(m->lock);
102     *pacquired = (status == APR_SUCCESS);
103     if (*pacquired) {
104         apr_threadkey_private_set(m->lock, thread_lock);
105     }
106     return status;
107 }
108
109 static void leave_mutex(h2_mplx *m, int acquired)
110 {
111     if (acquired) {
112         apr_threadkey_private_set(NULL, thread_lock);
113         apr_thread_mutex_unlock(m->lock);
114     }
115 }
116
117 static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
118 {
119     leave_mutex(ctx, 1);
120 }
121
122 static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
123 {
124     h2_mplx *m = ctx;
125     int acquired;
126     apr_status_t status;
127     
128     status = enter_mutex(m, &acquired);
129     if (status == APR_SUCCESS) {
130         pbl->mutex = m->lock;
131         pbl->leave = acquired? beam_leave : NULL;
132         pbl->leave_ctx = m;
133     }
134     return status;
135 }
136
137 static void stream_output_consumed(void *ctx, 
138                                    h2_bucket_beam *beam, apr_off_t length)
139 {
140     h2_task *task = ctx;
141     if (length > 0 && task && task->assigned) {
142         h2_req_engine_out_consumed(task->assigned, task->c, length); 
143     }
144 }
145
146 static void stream_input_consumed(void *ctx, 
147                                   h2_bucket_beam *beam, apr_off_t length)
148 {
149     h2_mplx *m = ctx;
150     if (m->input_consumed && length) {
151         m->input_consumed(m->input_consumed_ctx, beam->id, length);
152     }
153 }
154
155 static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
156 {
157     h2_mplx *m = ctx;
158     if (m->tx_handles_reserved > 0) {
159         --m->tx_handles_reserved;
160         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
161                       "h2_mplx(%ld-%d): beaming file %s, tx_avail %d", 
162                       m->id, beam->id, beam->tag, m->tx_handles_reserved);
163         return 1;
164     }
165     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
166                   "h2_mplx(%ld-%d): can_beam_file denied on %s", 
167                   m->id, beam->id, beam->tag);
168     return 0;
169 }
170
171 static void have_out_data_for(h2_mplx *m, int stream_id);
172 static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
173
174 static void check_tx_reservation(h2_mplx *m) 
175 {
176     if (m->tx_handles_reserved <= 0) {
177         m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, 
178             H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
179     }
180 }
181
182 static void check_tx_free(h2_mplx *m) 
183 {
184     if (m->tx_handles_reserved > m->tx_chunk_size) {
185         apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size;
186         m->tx_handles_reserved = m->tx_chunk_size;
187         h2_workers_tx_free(m->workers, count);
188     }
189     else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
190         h2_workers_tx_free(m->workers, m->tx_handles_reserved);
191         m->tx_handles_reserved = 0;
192     }
193 }
194
195 static int purge_stream(void *ctx, void *val) 
196 {
197     h2_mplx *m = ctx;
198     h2_stream *stream = val;
199     h2_task *task = h2_ihash_get(m->tasks, stream->id);
200     h2_ihash_remove(m->spurge, stream->id);
201     h2_stream_destroy(stream);
202     if (task) {
203         task_destroy(m, task, 1);
204     }
205     return 0;
206 }
207
208 static void purge_streams(h2_mplx *m)
209 {
210     if (!h2_ihash_empty(m->spurge)) {
211         while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
212             /* repeat until empty */
213         }
214         h2_ihash_clear(m->spurge);
215     }
216 }
217
218 static void h2_mplx_destroy(h2_mplx *m)
219 {
220     AP_DEBUG_ASSERT(m);
221     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
222                   "h2_mplx(%ld): destroy, tasks=%d", 
223                   m->id, (int)h2_ihash_count(m->tasks));
224     check_tx_free(m);
225     if (m->pool) {
226         apr_pool_destroy(m->pool);
227     }
228 }
229
230 /**
231  * A h2_mplx needs to be thread-safe *and* if will be called by
232  * the h2_session thread *and* the h2_worker threads. Therefore:
233  * - calls are protected by a mutex lock, m->lock
234  * - the pool needs its own allocator, since apr_allocator_t are 
235  *   not re-entrant. The separate allocator works without a 
236  *   separate lock since we already protect h2_mplx itself.
237  *   Since HTTP/2 connections can be expected to live longer than
238  *   their HTTP/1 cousins, the separate allocator seems to work better
239  *   than protecting a shared h2_session one with an own lock.
240  */
241 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, 
242                         const h2_config *conf, 
243                         apr_interval_time_t stream_timeout,
244                         h2_workers *workers)
245 {
246     apr_status_t status = APR_SUCCESS;
247     apr_allocator_t *allocator = NULL;
248     h2_mplx *m;
249     AP_DEBUG_ASSERT(conf);
250     
251     status = apr_allocator_create(&allocator);
252     if (status != APR_SUCCESS) {
253         return NULL;
254     }
255
256     m = apr_pcalloc(parent, sizeof(h2_mplx));
257     if (m) {
258         m->id = c->id;
259         APR_RING_ELEM_INIT(m, link);
260         m->c = c;
261         apr_pool_create_ex(&m->pool, parent, NULL, allocator);
262         if (!m->pool) {
263             return NULL;
264         }
265         apr_pool_tag(m->pool, "h2_mplx");
266         apr_allocator_owner_set(allocator, m->pool);
267         
268         status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
269                                          m->pool);
270         if (status != APR_SUCCESS) {
271             h2_mplx_destroy(m);
272             return NULL;
273         }
274         
275         status = apr_thread_cond_create(&m->task_thawed, m->pool);
276         if (status != APR_SUCCESS) {
277             h2_mplx_destroy(m);
278             return NULL;
279         }
280     
281         m->bucket_alloc = apr_bucket_alloc_create(m->pool);
282         m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
283         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
284
285         m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
286         m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
287         m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
288         m->q = h2_iq_create(m->pool, m->max_streams);
289         m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
290         m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
291         m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
292
293         m->stream_timeout = stream_timeout;
294         m->workers = workers;
295         m->workers_max = workers->max_workers;
296         m->workers_def_limit = 4;
297         m->workers_limit = m->workers_def_limit;
298         m->last_limit_change = m->last_idle_block = apr_time_now();
299         m->limit_change_interval = apr_time_from_msec(200);
300         
301         m->tx_handles_reserved = 0;
302         m->tx_chunk_size = 4;
303         
304         m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
305         
306         m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams, 
307                                          m->stream_max_mem);
308         h2_ngn_shed_set_ctx(m->ngn_shed , m);
309     }
310     return m;
311 }
312
313 apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
314 {
315     int acquired, max_stream_started = 0;
316     
317     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
318         max_stream_started = m->max_stream_started;
319         /* Clear schedule queue, disabling existing streams from starting */ 
320         h2_iq_clear(m->q);
321         leave_mutex(m, acquired);
322     }
323     return max_stream_started;
324 }
325
326 static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
327 {
328     if (stream->input && stream->started) {
329         h2_beam_send(stream->input, NULL, 0); /* trigger updates */
330     }
331 }
332
333 static int output_consumed_signal(h2_mplx *m, h2_task *task)
334 {
335     if (task->output.beam && task->worker_started && task->assigned) {
336         /* trigger updates */
337         h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
338     }
339     return 0;
340 }
341
342
343 static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
344 {
345     conn_rec *slave = NULL;
346     int reuse_slave = 0;
347     apr_status_t status;
348     
349     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
350                   "h2_task(%s): destroy", task->id);
351     if (called_from_master) {
352         /* Process outstanding events before destruction */
353         h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
354         if (stream) {
355             input_consumed_signal(m, stream);
356         }
357     }
358     
359     /* The pool is cleared/destroyed which also closes all
360      * allocated file handles. Give this count back to our
361      * file handle pool. */
362     if (task->output.beam) {
363         m->tx_handles_reserved += 
364         h2_beam_get_files_beamed(task->output.beam);
365         h2_beam_on_produced(task->output.beam, NULL, NULL);
366         status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1);
367         if (status != APR_SUCCESS){
368             ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, 
369                           APLOGNO(03385) "h2_task(%s): output shutdown "
370                           "incomplete", task->id);
371         }
372     }
373     
374     slave = task->c;
375     reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
376                    && !task->rst_error);
377     
378     h2_ihash_remove(m->tasks, task->stream_id);
379     if (m->redo_tasks) {
380         h2_ihash_remove(m->redo_tasks, task->stream_id);
381     }
382     h2_task_destroy(task);
383
384     if (slave) {
385         if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
386             APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
387         }
388         else {
389             slave->sbh = NULL;
390             h2_slave_destroy(slave, NULL);
391         }
392     }
393     
394     check_tx_free(m);
395 }
396
397 static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) 
398 {
399     h2_task *task;
400     
401     ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
402                   "h2_stream(%ld-%d): done", m->c->id, stream->id);
403     /* Situation: we are, on the master connection, done with processing
404      * the stream. Either we have handled it successfully, or the stream
405      * was reset by the client or the connection is gone and we are 
406      * shutting down the whole session.
407      *
408      * We possibly have created a task for this stream to be processed
409      * on a slave connection. The processing might actually be ongoing
410      * right now or has already finished. A finished task waits for its
411      * stream to be done. This is the common case.
412      * 
413      * If the stream had input (e.g. the request had a body), a task
414      * may have read, or is still reading buckets from the input beam.
415      * This means that the task is referencing memory from the stream's
416      * pool (or the master connection bucket alloc). Before we can free
417      * the stream pool, we need to make sure that those references are
418      * gone. This is what h2_beam_shutdown() on the input waits for.
419      *
420      * With the input handled, we can tear down that beam and care
421      * about the output beam. The stream might still have buffered some
422      * buckets read from the output, so we need to get rid of those. That
423      * is done by h2_stream_cleanup().
424      *
425      * Now it is save to destroy the task (if it exists and is finished).
426      * 
427      * FIXME: we currently destroy the stream, even if the task is still
428      * ongoing. This is not ok, since task->request is coming from stream
429      * memory. We should either copy it on task creation or wait with the
430      * stream destruction until the task is done. 
431      */
432     h2_iq_remove(m->q, stream->id);
433     h2_ihash_remove(m->sready, stream->id);
434     h2_ihash_remove(m->sresume, stream->id);
435     h2_ihash_remove(m->streams, stream->id);
436     if (stream->input) {
437         m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
438         h2_beam_on_consumed(stream->input, NULL, NULL);
439         /* Let anyone blocked reading know that there is no more to come */
440         h2_beam_abort(stream->input);
441         /* Remove mutex after, so that abort still finds cond to signal */
442         h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
443     }
444     h2_stream_cleanup(stream);
445
446     task = h2_ihash_get(m->tasks, stream->id);
447     if (task) {
448         if (!task->worker_done) {
449             /* task still running, cleanup once it is done */
450             if (rst_error) {
451                 h2_task_rst(task, rst_error);
452             }
453             h2_ihash_add(m->shold, stream);
454             return;
455         }
456         else {
457             /* already finished */
458             task_destroy(m, task, 0);
459         }
460     }
461     h2_stream_destroy(stream);
462 }
463
464 static int stream_done_iter(void *ctx, void *val)
465 {
466     stream_done((h2_mplx*)ctx, val, 0);
467     return 0;
468 }
469
470 static int task_print(void *ctx, void *val)
471 {
472     h2_mplx *m = ctx;
473     h2_task *task = val;
474
475     if (task && task->request) {
476         h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
477
478         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
479                       "->03198: h2_stream(%s): %s %s %s -> %s %d"
480                       "[orph=%d/started=%d/done=%d]", 
481                       task->id, task->request->method, 
482                       task->request->authority, task->request->path,
483                       task->response? "http" : (task->rst_error? "reset" : "?"),
484                       task->response? task->response->http_status : task->rst_error,
485                       (stream? 0 : 1), task->worker_started, 
486                       task->worker_done);
487     }
488     else if (task) {
489         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
490                       "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
491     }
492     else {
493         ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
494                       "->03198: h2_stream(%ld-NULL): NULL", m->id);
495     }
496     return 1;
497 }
498
499 static int task_abort_connection(void *ctx, void *val)
500 {
501     h2_task *task = val;
502     if (task->c) {
503         task->c->aborted = 1;
504     }
505     if (task->input.beam) {
506         h2_beam_abort(task->input.beam);
507     }
508     if (task->output.beam) {
509         h2_beam_abort(task->output.beam);
510     }
511     return 1;
512 }
513
514 static int report_stream_iter(void *ctx, void *val) {
515     h2_mplx *m = ctx;
516     h2_stream *stream = val;
517     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
518                   "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
519                   "submitted=%d, suspended=%d", 
520                   m->id, stream->id, stream->started, stream->scheduled,
521                   stream->submitted, stream->suspended);
522     return 1;
523 }
524
525 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
526 {
527     apr_status_t status;
528     int acquired;
529
530     h2_workers_unregister(m->workers, m);
531     
532     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
533         int i, wait_secs = 5;
534
535         if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
536             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
537                           "h2_mplx(%ld): release_join with %d streams open, "
538                           "%d streams resume, %d streams ready, %d tasks", 
539                           m->id, (int)h2_ihash_count(m->streams),
540                           (int)h2_ihash_count(m->sresume), 
541                           (int)h2_ihash_count(m->sready), 
542                           (int)h2_ihash_count(m->tasks));
543             h2_ihash_iter(m->streams, report_stream_iter, m);
544         }
545         
546         /* disable WINDOW_UPDATE callbacks */
547         h2_mplx_set_consumed_cb(m, NULL, NULL);
548         
549         if (!h2_ihash_empty(m->shold)) {
550             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
551                           "h2_mplx(%ld): start release_join with %d streams in hold", 
552                           m->id, (int)h2_ihash_count(m->shold));
553         }
554         if (!h2_ihash_empty(m->spurge)) {
555             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
556                           "h2_mplx(%ld): start release_join with %d streams to purge", 
557                           m->id, (int)h2_ihash_count(m->spurge));
558         }
559         
560         h2_iq_clear(m->q);
561         apr_thread_cond_broadcast(m->task_thawed);
562         while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
563             /* iterate until all streams have been removed */
564         }
565         AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
566     
567         if (!h2_ihash_empty(m->shold)) {
568             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
569                           "h2_mplx(%ld): 2. release_join with %d streams in hold", 
570                           m->id, (int)h2_ihash_count(m->shold));
571         }
572         if (!h2_ihash_empty(m->spurge)) {
573             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
574                           "h2_mplx(%ld): 2. release_join with %d streams to purge", 
575                           m->id, (int)h2_ihash_count(m->spurge));
576         }
577         
578         /* If we still have busy workers, we cannot release our memory
579          * pool yet, as tasks have references to us.
580          * Any operation on the task slave connection will from now on
581          * be errored ECONNRESET/ABORTED, so processing them should fail 
582          * and workers *should* return in a timely fashion.
583          */
584         for (i = 0; m->workers_busy > 0; ++i) {
585             h2_ihash_iter(m->tasks, task_abort_connection, m);
586             
587             m->join_wait = wait;
588             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
589             
590             if (APR_STATUS_IS_TIMEUP(status)) {
591                 if (i > 0) {
592                     /* Oh, oh. Still we wait for assigned  workers to report that 
593                      * they are done. Unless we have a bug, a worker seems to be hanging. 
594                      * If we exit now, all will be deallocated and the worker, once 
595                      * it does return, will walk all over freed memory...
596                      */
597                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
598                                   "h2_mplx(%ld): release, waiting for %d seconds now for "
599                                   "%d h2_workers to return, have still %d tasks outstanding", 
600                                   m->id, i*wait_secs, m->workers_busy,
601                                   (int)h2_ihash_count(m->tasks));
602                     if (i == 1) {
603                         h2_ihash_iter(m->tasks, task_print, m);
604                     }
605                 }
606                 h2_mplx_abort(m);
607                 apr_thread_cond_broadcast(m->task_thawed);
608             }
609         }
610         
611         AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
612         if (!h2_ihash_empty(m->spurge)) {
613             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
614                           "h2_mplx(%ld): 3. release_join %d streams to purge", 
615                           m->id, (int)h2_ihash_count(m->spurge));
616             purge_streams(m);
617         }
618         AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
619         
620         if (!h2_ihash_empty(m->tasks)) {
621             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
622                           "h2_mplx(%ld): release_join -> destroy, "
623                           "%d tasks still present", 
624                           m->id, (int)h2_ihash_count(m->tasks));
625         }
626         leave_mutex(m, acquired);
627         h2_mplx_destroy(m);
628         /* all gone */
629     }
630     return status;
631 }
632
633 void h2_mplx_abort(h2_mplx *m)
634 {
635     int acquired;
636     
637     AP_DEBUG_ASSERT(m);
638     if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
639         m->aborted = 1;
640         h2_ngn_shed_abort(m->ngn_shed);
641         leave_mutex(m, acquired);
642     }
643 }
644
645 apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
646 {
647     apr_status_t status = APR_SUCCESS;
648     int acquired;
649     
650     AP_DEBUG_ASSERT(m);
651     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
652         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
653                       "h2_mplx(%ld-%d): marking stream as done.", 
654                       m->id, stream->id);
655         stream_done(m, stream, stream->rst_error);
656         purge_streams(m);
657         leave_mutex(m, acquired);
658     }
659     return status;
660 }
661
662 void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
663 {
664     m->input_consumed = cb;
665     m->input_consumed_ctx = ctx;
666 }
667
668 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
669 {
670     apr_status_t status = APR_SUCCESS;
671     h2_task *task = h2_ihash_get(m->tasks, stream_id);
672     h2_stream *stream = h2_ihash_get(m->streams, stream_id);
673     
674     if (!task || !stream) {
675         return APR_ECONNABORTED;
676     }
677     
678     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
679                   "h2_mplx(%s): open response: %d, rst=%d",
680                   task->id, response->http_status, response->rst_error);
681     
682     h2_task_set_response(task, response);
683     
684     if (task->output.beam) {
685         h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
686         h2_beam_timeout_set(task->output.beam, m->stream_timeout);
687         h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
688         m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
689         h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
690         h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
691     }
692     
693     h2_ihash_add(m->sready, stream);
694     if (response && response->http_status < 300) {
695         /* we might see some file buckets in the output, see
696          * if we have enough handles reserved. */
697         check_tx_reservation(m);
698     }
699     have_out_data_for(m, stream_id);
700     return status;
701 }
702
703 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
704 {
705     apr_status_t status;
706     int acquired;
707     
708     AP_DEBUG_ASSERT(m);
709     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
710         if (m->aborted) {
711             status = APR_ECONNABORTED;
712         }
713         else {
714             status = out_open(m, stream_id, response);
715         }
716         leave_mutex(m, acquired);
717     }
718     return status;
719 }
720
721 static apr_status_t out_close(h2_mplx *m, h2_task *task)
722 {
723     apr_status_t status = APR_SUCCESS;
724     h2_stream *stream;
725     
726     if (!task) {
727         return APR_ECONNABORTED;
728     }
729
730     stream = h2_ihash_get(m->streams, task->stream_id);
731     if (!stream) {
732         return APR_ECONNABORTED;
733     }
734
735     if (!task->response && !task->rst_error) {
736         /* In case a close comes before a response was created,
737          * insert an error one so that our streams can properly reset.
738          */
739         h2_response *r = h2_response_die(task->stream_id, 500, 
740                                          task->request, m->pool);
741         status = out_open(m, task->stream_id, r);
742         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
743                       "h2_mplx(%s): close, no response, no rst", task->id);
744     }
745     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
746                   "h2_mplx(%s): close", task->id);
747     if (task->output.beam) {
748         status = h2_beam_close(task->output.beam);
749         h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
750                     APLOG_TRACE2);
751     }
752     output_consumed_signal(m, task);
753     have_out_data_for(m, task->stream_id);
754     return status;
755 }
756
757 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
758                                  apr_thread_cond_t *iowait)
759 {
760     apr_status_t status;
761     int acquired;
762     
763     AP_DEBUG_ASSERT(m);
764     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
765         if (m->aborted) {
766             status = APR_ECONNABORTED;
767         }
768         else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
769             status = APR_SUCCESS;
770         }
771         else {
772             purge_streams(m);
773             m->added_output = iowait;
774             status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
775             if (APLOGctrace2(m->c)) {
776                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
777                               "h2_mplx(%ld): trywait on data for %f ms)",
778                               m->id, timeout/1000.0);
779             }
780             m->added_output = NULL;
781         }
782         leave_mutex(m, acquired);
783     }
784     return status;
785 }
786
787 static void have_out_data_for(h2_mplx *m, int stream_id)
788 {
789     (void)stream_id;
790     AP_DEBUG_ASSERT(m);
791     if (m->added_output) {
792         apr_thread_cond_signal(m->added_output);
793     }
794 }
795
796 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
797 {
798     apr_status_t status;
799     int acquired;
800     
801     AP_DEBUG_ASSERT(m);
802     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
803         if (m->aborted) {
804             status = APR_ECONNABORTED;
805         }
806         else {
807             h2_iq_sort(m->q, cmp, ctx);
808             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
809                           "h2_mplx(%ld): reprioritize tasks", m->id);
810         }
811         leave_mutex(m, acquired);
812     }
813     return status;
814 }
815
816 apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, 
817                              h2_stream_pri_cmp *cmp, void *ctx)
818 {
819     apr_status_t status;
820     int do_registration = 0;
821     int acquired;
822     
823     AP_DEBUG_ASSERT(m);
824     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
825         if (m->aborted) {
826             status = APR_ECONNABORTED;
827         }
828         else {
829             h2_ihash_add(m->streams, stream);
830             if (stream->response) {
831                 /* already have a respone, schedule for submit */
832                 h2_ihash_add(m->sready, stream);
833             }
834             else {
835                 h2_beam_create(&stream->input, stream->pool, stream->id, 
836                                "input", 0);
837                 if (!m->need_registration) {
838                     m->need_registration = h2_iq_empty(m->q);
839                 }
840                 if (m->workers_busy < m->workers_max) {
841                     do_registration = m->need_registration;
842                 }
843                 h2_iq_add(m->q, stream->id, cmp, ctx);
844                 
845                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
846                               "h2_mplx(%ld-%d): process, body=%d", 
847                               m->c->id, stream->id, stream->request->body);
848             }
849         }
850         leave_mutex(m, acquired);
851     }
852     if (do_registration) {
853         m->need_registration = 0;
854         h2_workers_register(m->workers, m);
855     }
856     return status;
857 }
858
859 static h2_task *pop_task(h2_mplx *m)
860 {
861     h2_task *task = NULL;
862     h2_stream *stream;
863     int sid;
864     while (!m->aborted && !task  && (m->workers_busy < m->workers_limit)
865            && (sid = h2_iq_shift(m->q)) > 0) {
866         
867         stream = h2_ihash_get(m->streams, sid);
868         if (stream) {
869             conn_rec *slave, **pslave;
870             int new_conn = 0;
871
872             pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
873             if (pslave) {
874                 slave = *pslave;
875             }
876             else {
877                 slave = h2_slave_create(m->c, m->pool, NULL);
878                 new_conn = 1;
879             }
880             
881             slave->sbh = m->c->sbh;
882             slave->aborted = 0;
883             task = h2_task_create(slave, stream->request, stream->input, m);
884             h2_ihash_add(m->tasks, task);
885             
886             m->c->keepalives++;
887             apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
888             if (new_conn) {
889                 h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
890             }
891             stream->started = 1;
892             task->worker_started = 1;
893             task->started_at = apr_time_now();
894             if (sid > m->max_stream_started) {
895                 m->max_stream_started = sid;
896             }
897
898             if (stream->input) {
899                 h2_beam_timeout_set(stream->input, m->stream_timeout);
900                 h2_beam_on_consumed(stream->input, stream_input_consumed, m);
901                 h2_beam_on_file_beam(stream->input, can_beam_file, m);
902                 h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
903             }
904
905             ++m->workers_busy;
906         }
907     }
908     return task;
909 }
910
911 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
912 {
913     h2_task *task = NULL;
914     apr_status_t status;
915     int acquired;
916     
917     AP_DEBUG_ASSERT(m);
918     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
919         if (m->aborted) {
920             *has_more = 0;
921         }
922         else {
923             task = pop_task(m);
924             *has_more = !h2_iq_empty(m->q);
925         }
926         
927         if (has_more && !task) {
928             m->need_registration = 1;
929         }
930         leave_mutex(m, acquired);
931     }
932     return task;
933 }
934
935 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
936 {
937     if (task->frozen) {
938         /* this task was handed over to an engine for processing 
939          * and the original worker has finished. That means the 
940          * engine may start processing now. */
941         h2_task_thaw(task);
942         /* we do not want the task to block on writing response
943          * bodies into the mplx. */
944         h2_task_set_io_blocking(task, 0);
945         apr_thread_cond_broadcast(m->task_thawed);
946         return;
947     }
948     else {
949         h2_stream *stream;
950         
951         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
952                       "h2_mplx(%ld): task(%s) done", m->id, task->id);
953         out_close(m, task);
954         stream = h2_ihash_get(m->streams, task->stream_id);
955         
956         if (ngn) {
957             apr_off_t bytes = 0;
958             if (task->output.beam) {
959                 h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
960                 bytes += h2_beam_get_buffered(task->output.beam);
961             }
962             if (bytes > 0) {
963                 /* we need to report consumed and current buffered output
964                  * to the engine. The request will be streamed out or cancelled,
965                  * no more data is coming from it and the engine should update
966                  * its calculations before we destroy this information. */
967                 h2_req_engine_out_consumed(ngn, task->c, bytes);
968             }
969         }
970         
971         if (task->engine) {
972             if (!h2_req_engine_is_shutdown(task->engine)) {
973                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
974                               "h2_mplx(%ld): task(%s) has not-shutdown "
975                               "engine(%s)", m->id, task->id, 
976                               h2_req_engine_get_id(task->engine));
977             }
978             h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
979         }
980         
981         if (!m->aborted && stream && m->redo_tasks
982             && h2_ihash_get(m->redo_tasks, task->stream_id)) {
983             /* reset and schedule again */
984             h2_task_redo(task);
985             h2_ihash_remove(m->redo_tasks, task->stream_id);
986             h2_iq_add(m->q, task->stream_id, NULL, NULL);
987             return;
988         }
989         
990         task->worker_done = 1;
991         task->done_at = apr_time_now();
992         if (task->output.beam) {
993             h2_beam_on_consumed(task->output.beam, NULL, NULL);
994             h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
995         }
996         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
997                       "h2_mplx(%s): request done, %f ms elapsed", task->id, 
998                       (task->done_at - task->started_at) / 1000.0);
999         if (task->started_at > m->last_idle_block) {
1000             /* this task finished without causing an 'idle block', e.g.
1001              * a block by flow control.
1002              */
1003             if (task->done_at- m->last_limit_change >= m->limit_change_interval
1004                 && m->workers_limit < m->workers_max) {
1005                 /* Well behaving stream, allow it more workers */
1006                 m->workers_limit = H2MIN(m->workers_limit * 2, 
1007                                          m->workers_max);
1008                 m->last_limit_change = task->done_at;
1009                 m->need_registration = 1;
1010                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
1011                               "h2_mplx(%ld): increase worker limit to %d",
1012                               m->id, m->workers_limit);
1013             }
1014         }
1015         
1016         if (stream) {
1017             /* hang around until the stream deregisters */
1018             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
1019                           "h2_mplx(%s): task_done, stream still open", 
1020                           task->id);
1021             if (h2_stream_is_suspended(stream)) {
1022                 /* more data will not arrive, resume the stream */
1023                 h2_ihash_add(m->sresume, stream);
1024                 have_out_data_for(m, stream->id);
1025             }
1026         }
1027         else {
1028             /* stream no longer active, was it placed in hold? */
1029             stream = h2_ihash_get(m->shold, task->stream_id);
1030             if (stream) {
1031                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
1032                               "h2_mplx(%s): task_done, stream in hold", 
1033                               task->id);
1034                 /* We cannot destroy the stream here since this is 
1035                  * called from a worker thread and freeing memory pools
1036                  * is only safe in the only thread using it (and its
1037                  * parent pool / allocator) */
1038                 h2_ihash_remove(m->shold, stream->id);
1039                 h2_ihash_add(m->spurge, stream);
1040             }
1041             else {
1042                 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
1043                               "h2_mplx(%s): task_done, stream not found", 
1044                               task->id);
1045                 task_destroy(m, task, 0);
1046             }
1047             
1048             if (m->join_wait) {
1049                 apr_thread_cond_signal(m->join_wait);
1050             }
1051         }
1052     }
1053 }
1054
1055 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
1056 {
1057     int acquired;
1058     
1059     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
1060         task_done(m, task, NULL);
1061         --m->workers_busy;
1062         if (ptask) {
1063             /* caller wants another task */
1064             *ptask = pop_task(m);
1065         }
1066         leave_mutex(m, acquired);
1067     }
1068 }
1069
1070 /*******************************************************************************
1071  * h2_mplx DoS protection
1072  ******************************************************************************/
1073
1074 static int latest_repeatable_unsubmitted_iter(void *data, void *val)
1075 {
1076     task_iter_ctx *ctx = data;
1077     h2_task *task = val;
1078     if (!task->worker_done && h2_task_can_redo(task) 
1079         && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
1080         /* this task occupies a worker, the response has not been submitted yet,
1081          * not been cancelled and it is a repeatable request
1082          * -> it can be re-scheduled later */
1083         if (!ctx->task || ctx->task->started_at < task->started_at) {
1084             /* we did not have one or this one was started later */
1085             ctx->task = task;
1086         }
1087     }
1088     return 1;
1089 }
1090
1091 static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) 
1092 {
1093     task_iter_ctx ctx;
1094     ctx.m = m;
1095     ctx.task = NULL;
1096     h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
1097     return ctx.task;
1098 }
1099
1100 static int timed_out_busy_iter(void *data, void *val)
1101 {
1102     task_iter_ctx *ctx = data;
1103     h2_task *task = val;
1104     if (!task->worker_done
1105         && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
1106         /* timed out stream occupying a worker, found */
1107         ctx->task = task;
1108         return 0;
1109     }
1110     return 1;
1111 }
1112
1113 static h2_task *get_timed_out_busy_task(h2_mplx *m) 
1114 {
1115     task_iter_ctx ctx;
1116     ctx.m = m;
1117     ctx.task = NULL;
1118     ctx.now = apr_time_now();
1119     h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
1120     return ctx.task;
1121 }
1122
1123 static apr_status_t unschedule_slow_tasks(h2_mplx *m) 
1124 {
1125     h2_task *task;
1126     int n;
1127     
1128     if (!m->redo_tasks) {
1129         m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
1130     }
1131     /* Try to get rid of streams that occupy workers. Look for safe requests
1132      * that are repeatable. If none found, fail the connection.
1133      */
1134     n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
1135     while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
1136         h2_task_rst(task, H2_ERR_CANCEL);
1137         h2_ihash_add(m->redo_tasks, task);
1138         --n;
1139     }
1140     
1141     if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
1142         task = get_timed_out_busy_task(m);
1143         if (task) {
1144             /* Too many busy workers, unable to cancel enough streams
1145              * and with a busy, timed out stream, we tell the client
1146              * to go away... */
1147             return APR_TIMEUP;
1148         }
1149     }
1150     return APR_SUCCESS;
1151 }
1152
1153 apr_status_t h2_mplx_idle(h2_mplx *m)
1154 {
1155     apr_status_t status = APR_SUCCESS;
1156     apr_time_t now;            
1157     int acquired;
1158     
1159     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
1160         apr_size_t scount = h2_ihash_count(m->streams);
1161         if (scount > 0 && m->workers_busy) {
1162             /* If we have streams in connection state 'IDLE', meaning
1163              * all streams are ready to sent data out, but lack
1164              * WINDOW_UPDATEs. 
1165              * 
1166              * This is ok, unless we have streams that still occupy
1167              * h2 workers. As worker threads are a scarce resource, 
1168              * we need to take measures that we do not get DoSed.
1169              * 
1170              * This is what we call an 'idle block'. Limit the amount 
1171              * of busy workers we allow for this connection until it
1172              * well behaves.
1173              */
1174             now = apr_time_now();
1175             m->last_idle_block = now;
1176             if (m->workers_limit > 2 
1177                 && now - m->last_limit_change >= m->limit_change_interval) {
1178                 if (m->workers_limit > 16) {
1179                     m->workers_limit = 16;
1180                 }
1181                 else if (m->workers_limit > 8) {
1182                     m->workers_limit = 8;
1183                 }
1184                 else if (m->workers_limit > 4) {
1185                     m->workers_limit = 4;
1186                 }
1187                 else if (m->workers_limit > 2) {
1188                     m->workers_limit = 2;
1189                 }
1190                 m->last_limit_change = now;
1191                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
1192                               "h2_mplx(%ld): decrease worker limit to %d",
1193                               m->id, m->workers_limit);
1194             }
1195             
1196             if (m->workers_busy > m->workers_limit) {
1197                 status = unschedule_slow_tasks(m);
1198             }
1199         }
1200         leave_mutex(m, acquired);
1201     }
1202     return status;
1203 }
1204
1205 /*******************************************************************************
1206  * HTTP/2 request engines
1207  ******************************************************************************/
1208
1209 typedef struct {
1210     h2_mplx * m;
1211     h2_req_engine *ngn;
1212     int streams_updated;
1213 } ngn_update_ctx;
1214
1215 static int ngn_update_window(void *ctx, void *val)
1216 {
1217     ngn_update_ctx *uctx = ctx;
1218     h2_task *task = val;
1219     if (task && task->assigned == uctx->ngn
1220         && output_consumed_signal(uctx->m, task)) {
1221         ++uctx->streams_updated;
1222     }
1223     return 1;
1224 }
1225
1226 static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
1227 {
1228     ngn_update_ctx ctx;
1229         
1230     ctx.m = m;
1231     ctx.ngn = ngn;
1232     ctx.streams_updated = 0;
1233     h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
1234     
1235     return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
1236 }
1237
1238 apr_status_t h2_mplx_req_engine_push(const char *ngn_type, 
1239                                      request_rec *r,
1240                                      http2_req_engine_init *einit)
1241 {
1242     apr_status_t status;
1243     h2_mplx *m;
1244     h2_task *task;
1245     int acquired;
1246     
1247     task = h2_ctx_rget_task(r);
1248     if (!task) {
1249         return APR_ECONNABORTED;
1250     }
1251     m = task->mplx;
1252     task->r = r;
1253     
1254     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1255         h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
1256         
1257         if (stream) {
1258             status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
1259         }
1260         else {
1261             status = APR_ECONNABORTED;
1262         }
1263         leave_mutex(m, acquired);
1264     }
1265     return status;
1266 }
1267
1268 apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, 
1269                                      apr_read_type_e block, 
1270                                      apr_uint32_t capacity, 
1271                                      request_rec **pr)
1272 {   
1273     h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
1274     h2_mplx *m = h2_ngn_shed_get_ctx(shed);
1275     apr_status_t status;
1276     h2_task *task = NULL;
1277     int acquired;
1278     
1279     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1280         int want_shutdown = (block == APR_BLOCK_READ);
1281
1282         /* Take this opportunity to update output consummation 
1283          * for this engine */
1284         ngn_out_update_windows(m, ngn);
1285         
1286         if (want_shutdown && !h2_iq_empty(m->q)) {
1287             /* For a blocking read, check first if requests are to be
1288              * had and, if not, wait a short while before doing the
1289              * blocking, and if unsuccessful, terminating read.
1290              */
1291             status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
1292             if (APR_STATUS_IS_EAGAIN(status)) {
1293                 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
1294                               "h2_mplx(%ld): start block engine pull", m->id);
1295                 apr_thread_cond_timedwait(m->task_thawed, m->lock, 
1296                                           apr_time_from_msec(20));
1297                 status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
1298             }
1299         }
1300         else {
1301             status = h2_ngn_shed_pull_task(shed, ngn, capacity,
1302                                            want_shutdown, &task);
1303         }
1304         leave_mutex(m, acquired);
1305     }
1306     *pr = task? task->r : NULL;
1307     return status;
1308 }
1309  
1310 void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
1311 {
1312     h2_task *task = h2_ctx_cget_task(r_conn);
1313     
1314     if (task) {
1315         h2_mplx *m = task->mplx;
1316         int acquired;
1317
1318         if (enter_mutex(m, &acquired) == APR_SUCCESS) {
1319             ngn_out_update_windows(m, ngn);
1320             h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
1321             if (task->engine) { 
1322                 /* cannot report that as done until engine returns */
1323             }
1324             else {
1325                 task_done(m, task, ngn);
1326             }
1327             /* Take this opportunity to update output consummation 
1328              * for this engine */
1329             leave_mutex(m, acquired);
1330         }
1331     }
1332 }
1333
1334 /*******************************************************************************
1335  * mplx master events dispatching
1336  ******************************************************************************/
1337
1338 static int update_window(void *ctx, void *val)
1339 {
1340     input_consumed_signal(ctx, val);
1341     return 1;
1342 }
1343
1344 apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
1345                                             stream_ev_callback *on_resume, 
1346                                             stream_ev_callback *on_response, 
1347                                             void *on_ctx)
1348 {
1349     apr_status_t status;
1350     int acquired;
1351     int streams[32];
1352     h2_stream *stream;
1353     h2_task *task;
1354     size_t i, n;
1355     
1356     AP_DEBUG_ASSERT(m);
1357     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1358         ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
1359                       "h2_mplx(%ld): dispatch events", m->id);
1360                       
1361         /* update input windows for streams */
1362         h2_ihash_iter(m->streams, update_window, m);
1363
1364         if (on_response && !h2_ihash_empty(m->sready)) {
1365             n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
1366             for (i = 0; i < n; ++i) {
1367                 stream = h2_ihash_get(m->streams, streams[i]);
1368                 if (!stream) {
1369                     continue;
1370                 }
1371                 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
1372                               "h2_mplx(%ld-%d): on_response", 
1373                               m->id, stream->id);
1374                 task = h2_ihash_get(m->tasks, stream->id);
1375                 if (task) {
1376                     task->submitted = 1;
1377                     if (task->rst_error) {
1378                         h2_stream_rst(stream, task->rst_error);
1379                     }
1380                     else {
1381                         AP_DEBUG_ASSERT(task->response);
1382                         h2_stream_set_response(stream, task->response, task->output.beam);
1383                     }
1384                 }
1385                 else {
1386                     /* We have the stream ready without a task. This happens
1387                      * when we fail streams early. A response should already
1388                      * be present.  */
1389                     AP_DEBUG_ASSERT(stream->response || stream->rst_error);
1390                 }
1391                 status = on_response(on_ctx, stream->id);
1392             }
1393         }
1394
1395         if (on_resume && !h2_ihash_empty(m->sresume)) {
1396             n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
1397             for (i = 0; i < n; ++i) {
1398                 stream = h2_ihash_get(m->streams, streams[i]);
1399                 if (!stream) {
1400                     continue;
1401                 }
1402                 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
1403                               "h2_mplx(%ld-%d): on_resume", 
1404                               m->id, stream->id);
1405                 h2_stream_set_suspended(stream, 0);
1406                 status = on_resume(on_ctx, stream->id);
1407             }
1408         }
1409         
1410         leave_mutex(m, acquired);
1411     }
1412     return status;
1413 }
1414
1415 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
1416 {
1417     h2_mplx *m = ctx;
1418     apr_status_t status;
1419     h2_stream *stream;
1420     int acquired;
1421     
1422     AP_DEBUG_ASSERT(m);
1423     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1424         stream = h2_ihash_get(m->streams, beam->id);
1425         if (stream && h2_stream_is_suspended(stream)) {
1426             h2_ihash_add(m->sresume, stream);
1427             h2_beam_on_produced(beam, NULL, NULL);
1428             have_out_data_for(m, beam->id);
1429         }
1430         leave_mutex(m, acquired);
1431     }
1432 }
1433
1434 apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
1435 {
1436     apr_status_t status;
1437     h2_stream *stream;
1438     h2_task *task;
1439     int acquired;
1440     
1441     AP_DEBUG_ASSERT(m);
1442     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1443         stream = h2_ihash_get(m->streams, stream_id);
1444         if (stream) {
1445             h2_stream_set_suspended(stream, 1);
1446             task = h2_ihash_get(m->tasks, stream->id);
1447             if (stream->started && (!task || task->worker_done)) {
1448                 h2_ihash_add(m->sresume, stream);
1449             }
1450             else {
1451                 /* register callback so that we can resume on new output */
1452                 h2_beam_on_produced(task->output.beam, output_produced, m);
1453             }
1454         }
1455         leave_mutex(m, acquired);
1456     }
1457     return status;
1458 }