1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
22 #include <apr_strings.h>
26 #include <http_core.h>
29 #include "mod_http2.h"
31 #include "h2_private.h"
32 #include "h2_bucket_beam.h"
33 #include "h2_config.h"
37 #include "h2_response.h"
39 #include "h2_ngn_shed.h"
40 #include "h2_request.h"
41 #include "h2_stream.h"
43 #include "h2_worker.h"
44 #include "h2_workers.h"
48 static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg,
49 conn_rec *c, int level)
51 if (beam && APLOG_C_IS_LEVEL(c,level)) {
55 off += apr_snprintf(buffer+off, H2_ALEN(buffer)-off, "cl=%d, ", beam->closed);
56 off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "red", ", ", &beam->red);
57 off += h2_util_bb_print(buffer+off, H2_ALEN(buffer)-off, "green", ", ", beam->green);
58 off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "hold", ", ", &beam->hold);
59 off += h2_util_bl_print(buffer+off, H2_ALEN(buffer)-off, "purge", "", &beam->purge);
61 ap_log_cerror(APLOG_MARK, level, 0, c, "beam(%ld-%d): %s %s",
62 c->id, id, msg, buffer);
66 /* utility for iterating over ihash task sets */
73 /* NULL or the mutex hold by this thread, used for recursive calls
75 static apr_threadkey_t *thread_lock;
77 apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s)
79 return apr_threadkey_private_create(&thread_lock, NULL, pool);
82 static apr_status_t enter_mutex(h2_mplx *m, int *pacquired)
87 /* Enter the mutex if this thread already holds the lock or
88 * if we can acquire it. Only on the later case do we unlock
89 * onleaving the mutex.
90 * This allow recursive entering of the mutex from the saem thread,
91 * which is what we need in certain situations involving callbacks
94 apr_threadkey_private_get(&mutex, thread_lock);
95 if (mutex == m->lock) {
100 AP_DEBUG_ASSERT(m->lock);
101 status = apr_thread_mutex_lock(m->lock);
102 *pacquired = (status == APR_SUCCESS);
104 apr_threadkey_private_set(m->lock, thread_lock);
109 static void leave_mutex(h2_mplx *m, int acquired)
112 apr_threadkey_private_set(NULL, thread_lock);
113 apr_thread_mutex_unlock(m->lock);
117 static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
122 static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
128 status = enter_mutex(m, &acquired);
129 if (status == APR_SUCCESS) {
130 pbl->mutex = m->lock;
131 pbl->leave = acquired? beam_leave : NULL;
137 static void stream_output_consumed(void *ctx,
138 h2_bucket_beam *beam, apr_off_t length)
141 if (length > 0 && task && task->assigned) {
142 h2_req_engine_out_consumed(task->assigned, task->c, length);
146 static void stream_input_consumed(void *ctx,
147 h2_bucket_beam *beam, apr_off_t length)
150 if (m->input_consumed && length) {
151 m->input_consumed(m->input_consumed_ctx, beam->id, length);
155 static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file)
158 if (m->tx_handles_reserved > 0) {
159 --m->tx_handles_reserved;
160 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
161 "h2_mplx(%ld-%d): beaming file %s, tx_avail %d",
162 m->id, beam->id, beam->tag, m->tx_handles_reserved);
165 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
166 "h2_mplx(%ld-%d): can_beam_file denied on %s",
167 m->id, beam->id, beam->tag);
171 static void have_out_data_for(h2_mplx *m, int stream_id);
172 static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
174 static void check_tx_reservation(h2_mplx *m)
176 if (m->tx_handles_reserved <= 0) {
177 m->tx_handles_reserved += h2_workers_tx_reserve(m->workers,
178 H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks)));
182 static void check_tx_free(h2_mplx *m)
184 if (m->tx_handles_reserved > m->tx_chunk_size) {
185 apr_size_t count = m->tx_handles_reserved - m->tx_chunk_size;
186 m->tx_handles_reserved = m->tx_chunk_size;
187 h2_workers_tx_free(m->workers, count);
189 else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) {
190 h2_workers_tx_free(m->workers, m->tx_handles_reserved);
191 m->tx_handles_reserved = 0;
195 static int purge_stream(void *ctx, void *val)
198 h2_stream *stream = val;
199 h2_task *task = h2_ihash_get(m->tasks, stream->id);
200 h2_ihash_remove(m->spurge, stream->id);
201 h2_stream_destroy(stream);
203 task_destroy(m, task, 1);
208 static void purge_streams(h2_mplx *m)
210 if (!h2_ihash_empty(m->spurge)) {
211 while(!h2_ihash_iter(m->spurge, purge_stream, m)) {
212 /* repeat until empty */
214 h2_ihash_clear(m->spurge);
218 static void h2_mplx_destroy(h2_mplx *m)
221 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
222 "h2_mplx(%ld): destroy, tasks=%d",
223 m->id, (int)h2_ihash_count(m->tasks));
226 apr_pool_destroy(m->pool);
231 * A h2_mplx needs to be thread-safe *and* if will be called by
232 * the h2_session thread *and* the h2_worker threads. Therefore:
233 * - calls are protected by a mutex lock, m->lock
234 * - the pool needs its own allocator, since apr_allocator_t are
235 * not re-entrant. The separate allocator works without a
236 * separate lock since we already protect h2_mplx itself.
237 * Since HTTP/2 connections can be expected to live longer than
238 * their HTTP/1 cousins, the separate allocator seems to work better
239 * than protecting a shared h2_session one with an own lock.
241 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
242 const h2_config *conf,
243 apr_interval_time_t stream_timeout,
246 apr_status_t status = APR_SUCCESS;
247 apr_allocator_t *allocator = NULL;
249 AP_DEBUG_ASSERT(conf);
251 status = apr_allocator_create(&allocator);
252 if (status != APR_SUCCESS) {
256 m = apr_pcalloc(parent, sizeof(h2_mplx));
259 APR_RING_ELEM_INIT(m, link);
261 apr_pool_create_ex(&m->pool, parent, NULL, allocator);
265 apr_pool_tag(m->pool, "h2_mplx");
266 apr_allocator_owner_set(allocator, m->pool);
268 status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
270 if (status != APR_SUCCESS) {
275 status = apr_thread_cond_create(&m->task_thawed, m->pool);
276 if (status != APR_SUCCESS) {
281 m->bucket_alloc = apr_bucket_alloc_create(m->pool);
282 m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS);
283 m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
285 m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
286 m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
287 m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
288 m->q = h2_iq_create(m->pool, m->max_streams);
289 m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
290 m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
291 m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
293 m->stream_timeout = stream_timeout;
294 m->workers = workers;
295 m->workers_max = workers->max_workers;
296 m->workers_def_limit = 4;
297 m->workers_limit = m->workers_def_limit;
298 m->last_limit_change = m->last_idle_block = apr_time_now();
299 m->limit_change_interval = apr_time_from_msec(200);
301 m->tx_handles_reserved = 0;
302 m->tx_chunk_size = 4;
304 m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*));
306 m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams,
308 h2_ngn_shed_set_ctx(m->ngn_shed , m);
313 apr_uint32_t h2_mplx_shutdown(h2_mplx *m)
315 int acquired, max_stream_started = 0;
317 if (enter_mutex(m, &acquired) == APR_SUCCESS) {
318 max_stream_started = m->max_stream_started;
319 /* Clear schedule queue, disabling existing streams from starting */
321 leave_mutex(m, acquired);
323 return max_stream_started;
326 static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
328 if (stream->input && stream->started) {
329 h2_beam_send(stream->input, NULL, 0); /* trigger updates */
333 static int output_consumed_signal(h2_mplx *m, h2_task *task)
335 if (task->output.beam && task->worker_started && task->assigned) {
336 /* trigger updates */
337 h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
343 static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
345 conn_rec *slave = NULL;
349 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
350 "h2_task(%s): destroy", task->id);
351 if (called_from_master) {
352 /* Process outstanding events before destruction */
353 h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
355 input_consumed_signal(m, stream);
359 /* The pool is cleared/destroyed which also closes all
360 * allocated file handles. Give this count back to our
361 * file handle pool. */
362 if (task->output.beam) {
363 m->tx_handles_reserved +=
364 h2_beam_get_files_beamed(task->output.beam);
365 h2_beam_on_produced(task->output.beam, NULL, NULL);
366 status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1);
367 if (status != APR_SUCCESS){
368 ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c,
369 APLOGNO(03385) "h2_task(%s): output shutdown "
370 "incomplete", task->id);
375 reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc)
376 && !task->rst_error);
378 h2_ihash_remove(m->tasks, task->stream_id);
380 h2_ihash_remove(m->redo_tasks, task->stream_id);
382 h2_task_destroy(task);
385 if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) {
386 APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave;
390 h2_slave_destroy(slave, NULL);
397 static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
401 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
402 "h2_stream(%ld-%d): done", m->c->id, stream->id);
403 /* Situation: we are, on the master connection, done with processing
404 * the stream. Either we have handled it successfully, or the stream
405 * was reset by the client or the connection is gone and we are
406 * shutting down the whole session.
408 * We possibly have created a task for this stream to be processed
409 * on a slave connection. The processing might actually be ongoing
410 * right now or has already finished. A finished task waits for its
411 * stream to be done. This is the common case.
413 * If the stream had input (e.g. the request had a body), a task
414 * may have read, or is still reading buckets from the input beam.
415 * This means that the task is referencing memory from the stream's
416 * pool (or the master connection bucket alloc). Before we can free
417 * the stream pool, we need to make sure that those references are
418 * gone. This is what h2_beam_shutdown() on the input waits for.
420 * With the input handled, we can tear down that beam and care
421 * about the output beam. The stream might still have buffered some
422 * buckets read from the output, so we need to get rid of those. That
423 * is done by h2_stream_cleanup().
425 * Now it is save to destroy the task (if it exists and is finished).
427 * FIXME: we currently destroy the stream, even if the task is still
428 * ongoing. This is not ok, since task->request is coming from stream
429 * memory. We should either copy it on task creation or wait with the
430 * stream destruction until the task is done.
432 h2_iq_remove(m->q, stream->id);
433 h2_ihash_remove(m->sready, stream->id);
434 h2_ihash_remove(m->sresume, stream->id);
435 h2_ihash_remove(m->streams, stream->id);
437 m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
438 h2_beam_on_consumed(stream->input, NULL, NULL);
439 /* Let anyone blocked reading know that there is no more to come */
440 h2_beam_abort(stream->input);
441 /* Remove mutex after, so that abort still finds cond to signal */
442 h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
444 h2_stream_cleanup(stream);
446 task = h2_ihash_get(m->tasks, stream->id);
448 if (!task->worker_done) {
449 /* task still running, cleanup once it is done */
451 h2_task_rst(task, rst_error);
453 h2_ihash_add(m->shold, stream);
457 /* already finished */
458 task_destroy(m, task, 0);
461 h2_stream_destroy(stream);
464 static int stream_done_iter(void *ctx, void *val)
466 stream_done((h2_mplx*)ctx, val, 0);
470 static int task_print(void *ctx, void *val)
475 if (task && task->request) {
476 h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
478 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
479 "->03198: h2_stream(%s): %s %s %s -> %s %d"
480 "[orph=%d/started=%d/done=%d]",
481 task->id, task->request->method,
482 task->request->authority, task->request->path,
483 task->response? "http" : (task->rst_error? "reset" : "?"),
484 task->response? task->response->http_status : task->rst_error,
485 (stream? 0 : 1), task->worker_started,
489 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
490 "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
493 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
494 "->03198: h2_stream(%ld-NULL): NULL", m->id);
499 static int task_abort_connection(void *ctx, void *val)
503 task->c->aborted = 1;
505 if (task->input.beam) {
506 h2_beam_abort(task->input.beam);
508 if (task->output.beam) {
509 h2_beam_abort(task->output.beam);
514 static int report_stream_iter(void *ctx, void *val) {
516 h2_stream *stream = val;
517 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
518 "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
519 "submitted=%d, suspended=%d",
520 m->id, stream->id, stream->started, stream->scheduled,
521 stream->submitted, stream->suspended);
525 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
530 h2_workers_unregister(m->workers, m);
532 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
533 int i, wait_secs = 5;
535 if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
536 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
537 "h2_mplx(%ld): release_join with %d streams open, "
538 "%d streams resume, %d streams ready, %d tasks",
539 m->id, (int)h2_ihash_count(m->streams),
540 (int)h2_ihash_count(m->sresume),
541 (int)h2_ihash_count(m->sready),
542 (int)h2_ihash_count(m->tasks));
543 h2_ihash_iter(m->streams, report_stream_iter, m);
546 /* disable WINDOW_UPDATE callbacks */
547 h2_mplx_set_consumed_cb(m, NULL, NULL);
549 if (!h2_ihash_empty(m->shold)) {
550 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
551 "h2_mplx(%ld): start release_join with %d streams in hold",
552 m->id, (int)h2_ihash_count(m->shold));
554 if (!h2_ihash_empty(m->spurge)) {
555 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
556 "h2_mplx(%ld): start release_join with %d streams to purge",
557 m->id, (int)h2_ihash_count(m->spurge));
561 apr_thread_cond_broadcast(m->task_thawed);
562 while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
563 /* iterate until all streams have been removed */
565 AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
567 if (!h2_ihash_empty(m->shold)) {
568 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
569 "h2_mplx(%ld): 2. release_join with %d streams in hold",
570 m->id, (int)h2_ihash_count(m->shold));
572 if (!h2_ihash_empty(m->spurge)) {
573 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
574 "h2_mplx(%ld): 2. release_join with %d streams to purge",
575 m->id, (int)h2_ihash_count(m->spurge));
578 /* If we still have busy workers, we cannot release our memory
579 * pool yet, as tasks have references to us.
580 * Any operation on the task slave connection will from now on
581 * be errored ECONNRESET/ABORTED, so processing them should fail
582 * and workers *should* return in a timely fashion.
584 for (i = 0; m->workers_busy > 0; ++i) {
585 h2_ihash_iter(m->tasks, task_abort_connection, m);
588 status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
590 if (APR_STATUS_IS_TIMEUP(status)) {
592 /* Oh, oh. Still we wait for assigned workers to report that
593 * they are done. Unless we have a bug, a worker seems to be hanging.
594 * If we exit now, all will be deallocated and the worker, once
595 * it does return, will walk all over freed memory...
597 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
598 "h2_mplx(%ld): release, waiting for %d seconds now for "
599 "%d h2_workers to return, have still %d tasks outstanding",
600 m->id, i*wait_secs, m->workers_busy,
601 (int)h2_ihash_count(m->tasks));
603 h2_ihash_iter(m->tasks, task_print, m);
607 apr_thread_cond_broadcast(m->task_thawed);
611 AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
612 if (!h2_ihash_empty(m->spurge)) {
613 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
614 "h2_mplx(%ld): 3. release_join %d streams to purge",
615 m->id, (int)h2_ihash_count(m->spurge));
618 AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
620 if (!h2_ihash_empty(m->tasks)) {
621 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
622 "h2_mplx(%ld): release_join -> destroy, "
623 "%d tasks still present",
624 m->id, (int)h2_ihash_count(m->tasks));
626 leave_mutex(m, acquired);
633 void h2_mplx_abort(h2_mplx *m)
638 if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
640 h2_ngn_shed_abort(m->ngn_shed);
641 leave_mutex(m, acquired);
645 apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream)
647 apr_status_t status = APR_SUCCESS;
651 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
652 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
653 "h2_mplx(%ld-%d): marking stream as done.",
655 stream_done(m, stream, stream->rst_error);
657 leave_mutex(m, acquired);
662 void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
664 m->input_consumed = cb;
665 m->input_consumed_ctx = ctx;
668 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
670 apr_status_t status = APR_SUCCESS;
671 h2_task *task = h2_ihash_get(m->tasks, stream_id);
672 h2_stream *stream = h2_ihash_get(m->streams, stream_id);
674 if (!task || !stream) {
675 return APR_ECONNABORTED;
678 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
679 "h2_mplx(%s): open response: %d, rst=%d",
680 task->id, response->http_status, response->rst_error);
682 h2_task_set_response(task, response);
684 if (task->output.beam) {
685 h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
686 h2_beam_timeout_set(task->output.beam, m->stream_timeout);
687 h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
688 m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
689 h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
690 h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
693 h2_ihash_add(m->sready, stream);
694 if (response && response->http_status < 300) {
695 /* we might see some file buckets in the output, see
696 * if we have enough handles reserved. */
697 check_tx_reservation(m);
699 have_out_data_for(m, stream_id);
703 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
709 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
711 status = APR_ECONNABORTED;
714 status = out_open(m, stream_id, response);
716 leave_mutex(m, acquired);
721 static apr_status_t out_close(h2_mplx *m, h2_task *task)
723 apr_status_t status = APR_SUCCESS;
727 return APR_ECONNABORTED;
730 stream = h2_ihash_get(m->streams, task->stream_id);
732 return APR_ECONNABORTED;
735 if (!task->response && !task->rst_error) {
736 /* In case a close comes before a response was created,
737 * insert an error one so that our streams can properly reset.
739 h2_response *r = h2_response_die(task->stream_id, 500,
740 task->request, m->pool);
741 status = out_open(m, task->stream_id, r);
742 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
743 "h2_mplx(%s): close, no response, no rst", task->id);
745 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
746 "h2_mplx(%s): close", task->id);
747 if (task->output.beam) {
748 status = h2_beam_close(task->output.beam);
749 h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
752 output_consumed_signal(m, task);
753 have_out_data_for(m, task->stream_id);
757 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
758 apr_thread_cond_t *iowait)
764 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
766 status = APR_ECONNABORTED;
768 else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
769 status = APR_SUCCESS;
773 m->added_output = iowait;
774 status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
775 if (APLOGctrace2(m->c)) {
776 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
777 "h2_mplx(%ld): trywait on data for %f ms)",
778 m->id, timeout/1000.0);
780 m->added_output = NULL;
782 leave_mutex(m, acquired);
787 static void have_out_data_for(h2_mplx *m, int stream_id)
791 if (m->added_output) {
792 apr_thread_cond_signal(m->added_output);
796 apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
802 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
804 status = APR_ECONNABORTED;
807 h2_iq_sort(m->q, cmp, ctx);
808 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
809 "h2_mplx(%ld): reprioritize tasks", m->id);
811 leave_mutex(m, acquired);
816 apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
817 h2_stream_pri_cmp *cmp, void *ctx)
820 int do_registration = 0;
824 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
826 status = APR_ECONNABORTED;
829 h2_ihash_add(m->streams, stream);
830 if (stream->response) {
831 /* already have a respone, schedule for submit */
832 h2_ihash_add(m->sready, stream);
835 h2_beam_create(&stream->input, stream->pool, stream->id,
837 if (!m->need_registration) {
838 m->need_registration = h2_iq_empty(m->q);
840 if (m->workers_busy < m->workers_max) {
841 do_registration = m->need_registration;
843 h2_iq_add(m->q, stream->id, cmp, ctx);
845 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
846 "h2_mplx(%ld-%d): process, body=%d",
847 m->c->id, stream->id, stream->request->body);
850 leave_mutex(m, acquired);
852 if (do_registration) {
853 m->need_registration = 0;
854 h2_workers_register(m->workers, m);
859 static h2_task *pop_task(h2_mplx *m)
861 h2_task *task = NULL;
864 while (!m->aborted && !task && (m->workers_busy < m->workers_limit)
865 && (sid = h2_iq_shift(m->q)) > 0) {
867 stream = h2_ihash_get(m->streams, sid);
869 conn_rec *slave, **pslave;
872 pslave = (conn_rec **)apr_array_pop(m->spare_slaves);
877 slave = h2_slave_create(m->c, m->pool, NULL);
881 slave->sbh = m->c->sbh;
883 task = h2_task_create(slave, stream->request, stream->input, m);
884 h2_ihash_add(m->tasks, task);
887 apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id);
889 h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
892 task->worker_started = 1;
893 task->started_at = apr_time_now();
894 if (sid > m->max_stream_started) {
895 m->max_stream_started = sid;
899 h2_beam_timeout_set(stream->input, m->stream_timeout);
900 h2_beam_on_consumed(stream->input, stream_input_consumed, m);
901 h2_beam_on_file_beam(stream->input, can_beam_file, m);
902 h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
911 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
913 h2_task *task = NULL;
918 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
924 *has_more = !h2_iq_empty(m->q);
927 if (has_more && !task) {
928 m->need_registration = 1;
930 leave_mutex(m, acquired);
935 static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
938 /* this task was handed over to an engine for processing
939 * and the original worker has finished. That means the
940 * engine may start processing now. */
942 /* we do not want the task to block on writing response
943 * bodies into the mplx. */
944 h2_task_set_io_blocking(task, 0);
945 apr_thread_cond_broadcast(m->task_thawed);
951 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
952 "h2_mplx(%ld): task(%s) done", m->id, task->id);
954 stream = h2_ihash_get(m->streams, task->stream_id);
958 if (task->output.beam) {
959 h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
960 bytes += h2_beam_get_buffered(task->output.beam);
963 /* we need to report consumed and current buffered output
964 * to the engine. The request will be streamed out or cancelled,
965 * no more data is coming from it and the engine should update
966 * its calculations before we destroy this information. */
967 h2_req_engine_out_consumed(ngn, task->c, bytes);
972 if (!h2_req_engine_is_shutdown(task->engine)) {
973 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
974 "h2_mplx(%ld): task(%s) has not-shutdown "
975 "engine(%s)", m->id, task->id,
976 h2_req_engine_get_id(task->engine));
978 h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
981 if (!m->aborted && stream && m->redo_tasks
982 && h2_ihash_get(m->redo_tasks, task->stream_id)) {
983 /* reset and schedule again */
985 h2_ihash_remove(m->redo_tasks, task->stream_id);
986 h2_iq_add(m->q, task->stream_id, NULL, NULL);
990 task->worker_done = 1;
991 task->done_at = apr_time_now();
992 if (task->output.beam) {
993 h2_beam_on_consumed(task->output.beam, NULL, NULL);
994 h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
996 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
997 "h2_mplx(%s): request done, %f ms elapsed", task->id,
998 (task->done_at - task->started_at) / 1000.0);
999 if (task->started_at > m->last_idle_block) {
1000 /* this task finished without causing an 'idle block', e.g.
1001 * a block by flow control.
1003 if (task->done_at- m->last_limit_change >= m->limit_change_interval
1004 && m->workers_limit < m->workers_max) {
1005 /* Well behaving stream, allow it more workers */
1006 m->workers_limit = H2MIN(m->workers_limit * 2,
1008 m->last_limit_change = task->done_at;
1009 m->need_registration = 1;
1010 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
1011 "h2_mplx(%ld): increase worker limit to %d",
1012 m->id, m->workers_limit);
1017 /* hang around until the stream deregisters */
1018 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
1019 "h2_mplx(%s): task_done, stream still open",
1021 if (h2_stream_is_suspended(stream)) {
1022 /* more data will not arrive, resume the stream */
1023 h2_ihash_add(m->sresume, stream);
1024 have_out_data_for(m, stream->id);
1028 /* stream no longer active, was it placed in hold? */
1029 stream = h2_ihash_get(m->shold, task->stream_id);
1031 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
1032 "h2_mplx(%s): task_done, stream in hold",
1034 /* We cannot destroy the stream here since this is
1035 * called from a worker thread and freeing memory pools
1036 * is only safe in the only thread using it (and its
1037 * parent pool / allocator) */
1038 h2_ihash_remove(m->shold, stream->id);
1039 h2_ihash_add(m->spurge, stream);
1042 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
1043 "h2_mplx(%s): task_done, stream not found",
1045 task_destroy(m, task, 0);
1049 apr_thread_cond_signal(m->join_wait);
1055 void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
1059 if (enter_mutex(m, &acquired) == APR_SUCCESS) {
1060 task_done(m, task, NULL);
1063 /* caller wants another task */
1064 *ptask = pop_task(m);
1066 leave_mutex(m, acquired);
1070 /*******************************************************************************
1071 * h2_mplx DoS protection
1072 ******************************************************************************/
1074 static int latest_repeatable_unsubmitted_iter(void *data, void *val)
1076 task_iter_ctx *ctx = data;
1077 h2_task *task = val;
1078 if (!task->worker_done && h2_task_can_redo(task)
1079 && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) {
1080 /* this task occupies a worker, the response has not been submitted yet,
1081 * not been cancelled and it is a repeatable request
1082 * -> it can be re-scheduled later */
1083 if (!ctx->task || ctx->task->started_at < task->started_at) {
1084 /* we did not have one or this one was started later */
1091 static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m)
1096 h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx);
1100 static int timed_out_busy_iter(void *data, void *val)
1102 task_iter_ctx *ctx = data;
1103 h2_task *task = val;
1104 if (!task->worker_done
1105 && (ctx->now - task->started_at) > ctx->m->stream_timeout) {
1106 /* timed out stream occupying a worker, found */
1113 static h2_task *get_timed_out_busy_task(h2_mplx *m)
1118 ctx.now = apr_time_now();
1119 h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx);
1123 static apr_status_t unschedule_slow_tasks(h2_mplx *m)
1128 if (!m->redo_tasks) {
1129 m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id));
1131 /* Try to get rid of streams that occupy workers. Look for safe requests
1132 * that are repeatable. If none found, fail the connection.
1134 n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks));
1135 while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) {
1136 h2_task_rst(task, H2_ERR_CANCEL);
1137 h2_ihash_add(m->redo_tasks, task);
1141 if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) {
1142 task = get_timed_out_busy_task(m);
1144 /* Too many busy workers, unable to cancel enough streams
1145 * and with a busy, timed out stream, we tell the client
1153 apr_status_t h2_mplx_idle(h2_mplx *m)
1155 apr_status_t status = APR_SUCCESS;
1159 if (enter_mutex(m, &acquired) == APR_SUCCESS) {
1160 apr_size_t scount = h2_ihash_count(m->streams);
1161 if (scount > 0 && m->workers_busy) {
1162 /* If we have streams in connection state 'IDLE', meaning
1163 * all streams are ready to sent data out, but lack
1166 * This is ok, unless we have streams that still occupy
1167 * h2 workers. As worker threads are a scarce resource,
1168 * we need to take measures that we do not get DoSed.
1170 * This is what we call an 'idle block'. Limit the amount
1171 * of busy workers we allow for this connection until it
1174 now = apr_time_now();
1175 m->last_idle_block = now;
1176 if (m->workers_limit > 2
1177 && now - m->last_limit_change >= m->limit_change_interval) {
1178 if (m->workers_limit > 16) {
1179 m->workers_limit = 16;
1181 else if (m->workers_limit > 8) {
1182 m->workers_limit = 8;
1184 else if (m->workers_limit > 4) {
1185 m->workers_limit = 4;
1187 else if (m->workers_limit > 2) {
1188 m->workers_limit = 2;
1190 m->last_limit_change = now;
1191 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
1192 "h2_mplx(%ld): decrease worker limit to %d",
1193 m->id, m->workers_limit);
1196 if (m->workers_busy > m->workers_limit) {
1197 status = unschedule_slow_tasks(m);
1200 leave_mutex(m, acquired);
1205 /*******************************************************************************
1206 * HTTP/2 request engines
1207 ******************************************************************************/
1212 int streams_updated;
1215 static int ngn_update_window(void *ctx, void *val)
1217 ngn_update_ctx *uctx = ctx;
1218 h2_task *task = val;
1219 if (task && task->assigned == uctx->ngn
1220 && output_consumed_signal(uctx->m, task)) {
1221 ++uctx->streams_updated;
1226 static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn)
1232 ctx.streams_updated = 0;
1233 h2_ihash_iter(m->tasks, ngn_update_window, &ctx);
1235 return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN;
1238 apr_status_t h2_mplx_req_engine_push(const char *ngn_type,
1240 http2_req_engine_init *einit)
1242 apr_status_t status;
1247 task = h2_ctx_rget_task(r);
1249 return APR_ECONNABORTED;
1254 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1255 h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
1258 status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit);
1261 status = APR_ECONNABORTED;
1263 leave_mutex(m, acquired);
1268 apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn,
1269 apr_read_type_e block,
1270 apr_uint32_t capacity,
1273 h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn);
1274 h2_mplx *m = h2_ngn_shed_get_ctx(shed);
1275 apr_status_t status;
1276 h2_task *task = NULL;
1279 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1280 int want_shutdown = (block == APR_BLOCK_READ);
1282 /* Take this opportunity to update output consummation
1283 * for this engine */
1284 ngn_out_update_windows(m, ngn);
1286 if (want_shutdown && !h2_iq_empty(m->q)) {
1287 /* For a blocking read, check first if requests are to be
1288 * had and, if not, wait a short while before doing the
1289 * blocking, and if unsuccessful, terminating read.
1291 status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
1292 if (APR_STATUS_IS_EAGAIN(status)) {
1293 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
1294 "h2_mplx(%ld): start block engine pull", m->id);
1295 apr_thread_cond_timedwait(m->task_thawed, m->lock,
1296 apr_time_from_msec(20));
1297 status = h2_ngn_shed_pull_task(shed, ngn, capacity, 1, &task);
1301 status = h2_ngn_shed_pull_task(shed, ngn, capacity,
1302 want_shutdown, &task);
1304 leave_mutex(m, acquired);
1306 *pr = task? task->r : NULL;
1310 void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
1312 h2_task *task = h2_ctx_cget_task(r_conn);
1315 h2_mplx *m = task->mplx;
1318 if (enter_mutex(m, &acquired) == APR_SUCCESS) {
1319 ngn_out_update_windows(m, ngn);
1320 h2_ngn_shed_done_task(m->ngn_shed, ngn, task);
1322 /* cannot report that as done until engine returns */
1325 task_done(m, task, ngn);
1327 /* Take this opportunity to update output consummation
1328 * for this engine */
1329 leave_mutex(m, acquired);
1334 /*******************************************************************************
1335 * mplx master events dispatching
1336 ******************************************************************************/
1338 static int update_window(void *ctx, void *val)
1340 input_consumed_signal(ctx, val);
1344 apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
1345 stream_ev_callback *on_resume,
1346 stream_ev_callback *on_response,
1349 apr_status_t status;
1357 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1358 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
1359 "h2_mplx(%ld): dispatch events", m->id);
1361 /* update input windows for streams */
1362 h2_ihash_iter(m->streams, update_window, m);
1364 if (on_response && !h2_ihash_empty(m->sready)) {
1365 n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
1366 for (i = 0; i < n; ++i) {
1367 stream = h2_ihash_get(m->streams, streams[i]);
1371 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
1372 "h2_mplx(%ld-%d): on_response",
1374 task = h2_ihash_get(m->tasks, stream->id);
1376 task->submitted = 1;
1377 if (task->rst_error) {
1378 h2_stream_rst(stream, task->rst_error);
1381 AP_DEBUG_ASSERT(task->response);
1382 h2_stream_set_response(stream, task->response, task->output.beam);
1386 /* We have the stream ready without a task. This happens
1387 * when we fail streams early. A response should already
1389 AP_DEBUG_ASSERT(stream->response || stream->rst_error);
1391 status = on_response(on_ctx, stream->id);
1395 if (on_resume && !h2_ihash_empty(m->sresume)) {
1396 n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
1397 for (i = 0; i < n; ++i) {
1398 stream = h2_ihash_get(m->streams, streams[i]);
1402 ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
1403 "h2_mplx(%ld-%d): on_resume",
1405 h2_stream_set_suspended(stream, 0);
1406 status = on_resume(on_ctx, stream->id);
1410 leave_mutex(m, acquired);
1415 static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
1418 apr_status_t status;
1423 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1424 stream = h2_ihash_get(m->streams, beam->id);
1425 if (stream && h2_stream_is_suspended(stream)) {
1426 h2_ihash_add(m->sresume, stream);
1427 h2_beam_on_produced(beam, NULL, NULL);
1428 have_out_data_for(m, beam->id);
1430 leave_mutex(m, acquired);
1434 apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
1436 apr_status_t status;
1442 if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
1443 stream = h2_ihash_get(m->streams, stream_id);
1445 h2_stream_set_suspended(stream, 1);
1446 task = h2_ihash_get(m->tasks, stream->id);
1447 if (stream->started && (!task || task->worker_done)) {
1448 h2_ihash_add(m->sresume, stream);
1451 /* register callback so that we can resume on new output */
1452 h2_beam_on_produced(task->output.beam, output_produced, m);
1455 leave_mutex(m, acquired);