1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
19 #include <apr_atomic.h>
20 #include <apr_thread_mutex.h>
21 #include <apr_thread_cond.h>
22 #include <apr_strings.h>
26 #include <http_core.h>
29 #include "h2_private.h"
30 #include "h2_config.h"
33 #include "h2_io_set.h"
34 #include "h2_response.h"
36 #include "h2_request.h"
37 #include "h2_stream.h"
38 #include "h2_stream_set.h"
40 #include "h2_task_input.h"
41 #include "h2_task_output.h"
42 #include "h2_task_queue.h"
43 #include "h2_workers.h"
46 static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
49 *pstatus = APR_ECONNABORTED;
55 static void have_out_data_for(h2_mplx *m, int stream_id);
57 static void h2_mplx_destroy(h2_mplx *m)
66 h2_io_set_destroy(m->ready_ios);
70 h2_io_set_destroy(m->stream_ios);
75 apr_thread_mutex_destroy(m->lock);
80 apr_pool_destroy(m->pool);
85 * A h2_mplx needs to be thread-safe *and* if will be called by
86 * the h2_session thread *and* the h2_worker threads. Therefore:
87 * - calls are protected by a mutex lock, m->lock
88 * - the pool needs its own allocator, since apr_allocator_t are
89 * not re-entrant. The separate allocator works without a
90 * separate lock since we already protect h2_mplx itself.
91 * Since HTTP/2 connections can be expected to live longer than
92 * their HTTP/1 cousins, the separate allocator seems to work better
93 * than protecting a shared h2_session one with an own lock.
95 h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers)
97 apr_status_t status = APR_SUCCESS;
98 h2_config *conf = h2_config_get(c);
99 apr_allocator_t *allocator = NULL;
101 AP_DEBUG_ASSERT(conf);
103 status = apr_allocator_create(&allocator);
104 if (status != APR_SUCCESS) {
108 m = apr_pcalloc(parent, sizeof(h2_mplx));
111 APR_RING_ELEM_INIT(m, link);
112 apr_atomic_set32(&m->refs, 1);
114 apr_pool_create_ex(&m->pool, parent, NULL, allocator);
118 apr_allocator_owner_set(allocator, m->pool);
120 status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT,
122 if (status != APR_SUCCESS) {
127 m->bucket_alloc = apr_bucket_alloc_create(m->pool);
129 m->q = h2_tq_create(m->id, m->pool);
130 m->stream_ios = h2_io_set_create(m->pool);
131 m->ready_ios = h2_io_set_create(m->pool);
132 m->closed = h2_stream_set_create(m->pool);
133 m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
134 m->workers = workers;
136 m->file_handles_allowed = h2_config_geti(conf, H2_CONF_SESSION_FILES);
141 static void reference(h2_mplx *m)
143 apr_atomic_inc32(&m->refs);
146 static void release(h2_mplx *m)
148 if (!apr_atomic_dec32(&m->refs)) {
150 apr_thread_cond_signal(m->join_wait);
155 void h2_mplx_reference(h2_mplx *m)
159 void h2_mplx_release(h2_mplx *m)
164 static void workers_register(h2_mplx *m) {
165 /* Initially, there was ref count increase for this as well, but
166 * this is not needed, even harmful.
167 * h2_workers is only a hub for all the h2_worker instances.
168 * At the end-of-life of this h2_mplx, we always unregister at
169 * the workers. The thing to manage are all the h2_worker instances
170 * out there. Those may hold a reference to this h2_mplx and we cannot
171 * call them to unregister.
173 * Therefore: ref counting for h2_workers in not needed, ref counting
174 * for h2_worker using this is critical.
176 h2_workers_register(m->workers, m);
179 static void workers_unregister(h2_mplx *m) {
180 h2_workers_unregister(m->workers, m);
183 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
186 workers_unregister(m);
188 status = apr_thread_mutex_lock(m->lock);
189 if (APR_SUCCESS == status) {
193 while (apr_atomic_read32(&m->refs) > 0) {
195 ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG),
197 "h2_mplx(%ld): release_join, refs=%d, waiting...",
199 apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
200 if (++attempts >= 6) {
201 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
203 "h2_mplx(%ld): join attempts exhausted, refs=%d",
209 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
210 "h2_mplx(%ld): release_join -> destroy", m->id);
213 apr_thread_mutex_unlock(m->lock);
219 void h2_mplx_abort(h2_mplx *m)
223 status = apr_thread_mutex_lock(m->lock);
224 if (APR_SUCCESS == status) {
226 h2_io_set_destroy_all(m->stream_ios);
227 apr_thread_mutex_unlock(m->lock);
229 workers_unregister(m);
233 h2_stream *h2_mplx_open_io(h2_mplx *m, int stream_id)
235 h2_stream *stream = NULL;
242 status = apr_thread_mutex_lock(m->lock);
243 if (APR_SUCCESS == status) {
244 apr_pool_t *stream_pool = m->spare_pool;
247 apr_pool_create(&stream_pool, m->pool);
250 m->spare_pool = NULL;
253 stream = h2_stream_create(stream_id, stream_pool, m);
254 stream->state = H2_STREAM_ST_OPEN;
256 io = h2_io_set_get(m->stream_ios, stream_id);
258 io = h2_io_create(stream_id, stream_pool, m->bucket_alloc);
259 h2_io_set_add(m->stream_ios, io);
261 status = io? APR_SUCCESS : APR_ENOMEM;
262 apr_thread_mutex_unlock(m->lock);
267 static void stream_destroy(h2_mplx *m, h2_stream *stream, h2_io *io)
269 apr_pool_t *pool = h2_stream_detach_pool(stream);
271 apr_pool_clear(pool);
273 apr_pool_destroy(m->spare_pool);
275 m->spare_pool = pool;
277 h2_stream_destroy(stream);
279 /* The pool is cleared/destroyed which also closes all
280 * allocated file handles. Give this count back to our
281 * file handle pool. */
282 m->file_handles_allowed += io->files_handles_owned;
283 h2_io_set_remove(m->stream_ios, io);
288 apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
292 status = apr_thread_mutex_lock(m->lock);
293 if (APR_SUCCESS == status) {
294 h2_io *io = h2_io_set_get(m->stream_ios, stream->id);
295 if (!io || io->task_done) {
296 /* No more io or task already done -> cleanup immediately */
297 stream_destroy(m, stream, io);
300 /* Add stream to closed set for cleanup when task is done */
301 h2_stream_set_add(m->closed, stream);
303 apr_thread_mutex_unlock(m->lock);
308 void h2_mplx_task_done(h2_mplx *m, int stream_id)
310 apr_status_t status = apr_thread_mutex_lock(m->lock);
311 if (APR_SUCCESS == status) {
312 h2_stream *stream = h2_stream_set_get(m->closed, stream_id);
313 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
314 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
315 "h2_mplx(%ld): task(%d) done", m->id, stream_id);
317 /* stream was already closed by main connection and is in
318 * zombie state. Now that the task is done with it, we
319 * can free its resources. */
320 h2_stream_set_remove(m->closed, stream);
321 stream_destroy(m, stream, io);
324 /* main connection has not finished stream. Mark task as done
325 * so that eventual cleanup can start immediately. */
328 apr_thread_mutex_unlock(m->lock);
332 apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
333 int stream_id, apr_bucket_brigade *bb,
334 struct apr_thread_cond_t *iowait)
339 return APR_ECONNABORTED;
341 status = apr_thread_mutex_lock(m->lock);
342 if (APR_SUCCESS == status) {
343 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
345 io->input_arrived = iowait;
346 status = h2_io_in_read(io, bb, 0);
347 while (status == APR_EAGAIN
348 && !is_aborted(m, &status)
349 && block == APR_BLOCK_READ) {
350 apr_thread_cond_wait(io->input_arrived, m->lock);
351 status = h2_io_in_read(io, bb, 0);
353 io->input_arrived = NULL;
358 apr_thread_mutex_unlock(m->lock);
363 apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
364 apr_bucket_brigade *bb)
369 return APR_ECONNABORTED;
371 status = apr_thread_mutex_lock(m->lock);
372 if (APR_SUCCESS == status) {
373 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
375 status = h2_io_in_write(io, bb);
376 if (io->input_arrived) {
377 apr_thread_cond_signal(io->input_arrived);
383 apr_thread_mutex_unlock(m->lock);
388 apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
393 return APR_ECONNABORTED;
395 status = apr_thread_mutex_lock(m->lock);
396 if (APR_SUCCESS == status) {
397 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
399 status = h2_io_in_close(io);
400 if (io->input_arrived) {
401 apr_thread_cond_signal(io->input_arrived);
405 status = APR_ECONNABORTED;
407 apr_thread_mutex_unlock(m->lock);
413 h2_mplx_consumed_cb *cb;
418 static int update_window(void *ctx, h2_io *io)
420 if (io->input_consumed) {
421 update_ctx *uctx = (update_ctx*)ctx;
422 uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
423 io->input_consumed = 0;
424 ++uctx->streams_updated;
429 apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
430 h2_mplx_consumed_cb *cb, void *cb_ctx)
435 return APR_ECONNABORTED;
437 status = apr_thread_mutex_lock(m->lock);
438 if (APR_SUCCESS == status) {
443 ctx.streams_updated = 0;
446 h2_io_set_iter(m->stream_ios, update_window, &ctx);
448 if (ctx.streams_updated) {
449 status = APR_SUCCESS;
451 apr_thread_mutex_unlock(m->lock);
456 apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
457 h2_io_data_cb *cb, void *ctx,
458 apr_size_t *plen, int *peos)
463 return APR_ECONNABORTED;
465 status = apr_thread_mutex_lock(m->lock);
466 if (APR_SUCCESS == status) {
467 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
469 status = h2_io_out_readx(io, cb, ctx, plen, peos);
470 if (status == APR_SUCCESS && io->output_drained) {
471 apr_thread_cond_signal(io->output_drained);
475 status = APR_ECONNABORTED;
477 apr_thread_mutex_unlock(m->lock);
482 h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
485 h2_stream *stream = NULL;
490 status = apr_thread_mutex_lock(m->lock);
491 if (APR_SUCCESS == status) {
492 h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
494 h2_response *response = io->response;
495 h2_io_set_remove(m->ready_ios, io);
497 stream = h2_stream_set_get(streams, response->stream_id);
499 h2_stream_set_response(stream, response, io->bbout);
500 if (io->output_drained) {
501 apr_thread_cond_signal(io->output_drained);
505 ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
506 APLOGNO(02953) "h2_mplx(%ld): stream for response %d",
507 m->id, response->stream_id);
510 apr_thread_mutex_unlock(m->lock);
515 static apr_status_t out_write(h2_mplx *m, h2_io *io,
516 ap_filter_t* f, apr_bucket_brigade *bb,
517 struct apr_thread_cond_t *iowait)
519 apr_status_t status = APR_SUCCESS;
520 /* We check the memory footprint queued for this stream_id
521 * and block if it exceeds our configured limit.
522 * We will not split buckets to enforce the limit to the last
523 * byte. After all, the bucket is already in memory.
525 while (!APR_BRIGADE_EMPTY(bb)
526 && (status == APR_SUCCESS)
527 && !is_aborted(m, &status)) {
529 status = h2_io_out_write(io, bb, m->stream_max_mem,
530 &m->file_handles_allowed);
532 /* Wait for data to drain until there is room again */
533 while (!APR_BRIGADE_EMPTY(bb)
535 && status == APR_SUCCESS
536 && (m->stream_max_mem <= h2_io_out_length(io))
537 && !is_aborted(m, &status)) {
538 io->output_drained = iowait;
540 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
541 "h2_mplx(%ld-%d): waiting for out drain",
544 apr_thread_cond_wait(io->output_drained, m->lock);
545 io->output_drained = NULL;
548 apr_brigade_cleanup(bb);
552 static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response,
553 ap_filter_t* f, apr_bucket_brigade *bb,
554 struct apr_thread_cond_t *iowait)
556 apr_status_t status = APR_SUCCESS;
558 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
561 ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c,
562 "h2_mplx(%ld-%d): open response: %s",
563 m->id, stream_id, response->status);
566 io->response = h2_response_copy(io->pool, response);
567 h2_io_set_add(m->ready_ios, io);
569 status = out_write(m, io, f, bb, iowait);
571 have_out_data_for(m, stream_id);
574 status = APR_ECONNABORTED;
579 apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
580 ap_filter_t* f, apr_bucket_brigade *bb,
581 struct apr_thread_cond_t *iowait)
586 return APR_ECONNABORTED;
588 status = apr_thread_mutex_lock(m->lock);
589 if (APR_SUCCESS == status) {
590 status = out_open(m, stream_id, response, f, bb, iowait);
592 return APR_ECONNABORTED;
594 apr_thread_mutex_unlock(m->lock);
600 apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
601 ap_filter_t* f, apr_bucket_brigade *bb,
602 struct apr_thread_cond_t *iowait)
607 return APR_ECONNABORTED;
609 status = apr_thread_mutex_lock(m->lock);
610 if (APR_SUCCESS == status) {
612 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
614 status = out_write(m, io, f, bb, iowait);
615 have_out_data_for(m, stream_id);
617 return APR_ECONNABORTED;
621 status = APR_ECONNABORTED;
626 apr_thread_mutex_unlock(m->lock);
632 apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
637 return APR_ECONNABORTED;
639 status = apr_thread_mutex_lock(m->lock);
640 if (APR_SUCCESS == status) {
642 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
644 if (!io->response->ngheader) {
645 /* In case a close comes before a response was created,
646 * insert an error one so that our streams can properly
649 h2_response *r = h2_response_create(stream_id,
650 "500", NULL, m->pool);
651 status = out_open(m, stream_id, r, NULL, NULL, NULL);
653 status = h2_io_out_close(io);
654 have_out_data_for(m, stream_id);
656 /* if we were the last output, the whole session might
657 * have gone down in the meantime.
663 status = APR_ECONNABORTED;
666 apr_thread_mutex_unlock(m->lock);
671 int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
679 status = apr_thread_mutex_lock(m->lock);
680 if (APR_SUCCESS == status) {
681 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
683 has_eos = h2_io_in_has_eos_for(io);
685 apr_thread_mutex_unlock(m->lock);
690 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
698 status = apr_thread_mutex_lock(m->lock);
699 if (APR_SUCCESS == status) {
700 h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
702 has_data = h2_io_out_has_data(io);
704 apr_thread_mutex_unlock(m->lock);
709 apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout,
710 apr_thread_cond_t *iowait)
715 return APR_ECONNABORTED;
717 status = apr_thread_mutex_lock(m->lock);
718 if (APR_SUCCESS == status) {
719 m->added_output = iowait;
720 status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
721 if (APLOGctrace2(m->c)) {
722 ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
723 "h2_mplx(%ld): trywait on data for %f ms)",
724 m->id, timeout/1000.0);
726 m->added_output = NULL;
727 apr_thread_mutex_unlock(m->lock);
732 static void have_out_data_for(h2_mplx *m, int stream_id)
736 if (m->added_output) {
737 apr_thread_cond_signal(m->added_output);
741 apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
746 return APR_ECONNABORTED;
748 status = apr_thread_mutex_lock(m->lock);
749 if (APR_SUCCESS == status) {
750 /* TODO: needs to sort queue by priority */
751 ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
752 "h2_mplx: do task(%s)", task->id);
753 h2_tq_append(m->q, task);
754 apr_thread_mutex_unlock(m->lock);
760 h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
762 h2_task *task = NULL;
769 status = apr_thread_mutex_lock(m->lock);
770 if (APR_SUCCESS == status) {
771 task = h2_tq_pop_first(m->q);
773 h2_task_set_started(task);
775 *has_more = !h2_tq_empty(m->q);
776 apr_thread_mutex_unlock(m->lock);
781 apr_status_t h2_mplx_create_task(h2_mplx *m, struct h2_stream *stream)
786 return APR_ECONNABORTED;
788 status = apr_thread_mutex_lock(m->lock);
789 if (APR_SUCCESS == status) {
790 conn_rec *c = h2_conn_create(m->c, stream->pool);
791 stream->task = h2_task_create(m->id, stream->id,
794 apr_thread_mutex_unlock(m->lock);