Changes with Apache 2.4.24
+ *) mod_http2: fix suspended handling for streams. Output could become
+ blocked in rare cases.
+
*) mpm_winnt: Prevent a denial of service when the 'data' AcceptFilter is in
use by replacing it with the 'connect' filter. PR 59970. [Jacob Champion]
}
}
+static void report_consumption(h2_bucket_beam *beam, int force)
+{
+ if (force || beam->received_bytes != beam->reported_consumed_bytes) {
+ if (beam->consumed_fn) {
+ beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
+ - beam->reported_consumed_bytes);
+ }
+ beam->reported_consumed_bytes = beam->received_bytes;
+ }
+}
+
+static void report_production(h2_bucket_beam *beam, int force)
+{
+ if (force || beam->sent_bytes != beam->reported_produced_bytes) {
+ if (beam->produced_fn) {
+ beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
+ - beam->reported_produced_bytes);
+ }
+ beam->reported_produced_bytes = beam->sent_bytes;
+ }
+}
+
static apr_off_t calc_buffered(h2_bucket_beam *beam)
{
apr_off_t len = 0;
*premain = calc_space_left(beam);
while (!beam->aborted && *premain <= 0
&& (block == APR_BLOCK_READ) && pbl->mutex) {
- apr_status_t status = wait_cond(beam, pbl->mutex);
+ apr_status_t status;
+ report_production(beam, 1);
+ status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
}
}
}
-static void report_consumption(h2_bucket_beam *beam, int force)
-{
- if (force || beam->received_bytes != beam->reported_consumed_bytes) {
- if (beam->consumed_fn) {
- beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
- - beam->reported_consumed_bytes);
- }
- beam->reported_consumed_bytes = beam->received_bytes;
- }
-}
-
-static void report_production(h2_bucket_beam *beam, int force)
-{
- if (force || beam->sent_bytes != beam->reported_produced_bytes) {
- if (beam->produced_fn) {
- beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
- - beam->reported_produced_bytes);
- }
- beam->reported_produced_bytes = beam->sent_bytes;
- }
-}
-
static void h2_blist_cleanup(h2_blist *bl)
{
apr_bucket *e;
}
if (transferred) {
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
status = APR_SUCCESS;
}
else if (beam->closed) {
goto transfer;
}
else {
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
status = APR_EAGAIN;
}
leave:
return 0;
}
-static void have_out_data_for(h2_mplx *m, int stream_id);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
static void check_tx_reservation(h2_mplx *m)
if (task->input.beam) {
h2_beam_abort(task->input.beam);
}
- if (task->output.beam) {
+ if (task->worker_started && !task->worker_done && task->output.beam) {
h2_beam_abort(task->output.beam);
}
return 1;
m->input_consumed_ctx = ctx;
}
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+ h2_mplx *m = ctx;
+ apr_status_t status;
+ h2_stream *stream;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, beam->id);
+ if (stream) {
+ have_out_data_for(m, stream, 0);
+ }
+ leave_mutex(m, acquired);
+ }
+}
+
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
h2_beam_timeout_set(task->output.beam, m->stream_timeout);
h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+ h2_beam_on_produced(task->output.beam, output_produced, m);
m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
if (!task->output.copy_files) {
h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
task->output.opened = 1;
}
- h2_ihash_add(m->sready, stream);
if (response && response->http_status < 300) {
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
check_tx_reservation(m);
}
- have_out_data_for(m, stream_id);
+ have_out_data_for(m, stream, 1);
return status;
}
APLOG_TRACE2);
}
output_consumed_signal(m, task);
- have_out_data_for(m, task->stream_id);
+ have_out_data_for(m, stream, 0);
return status;
}
return status;
}
-static void have_out_data_for(h2_mplx *m, int stream_id)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
{
- (void)stream_id;
- AP_DEBUG_ASSERT(m);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
+ h2_ihash_t *set;
+ ap_assert(m);
+ ap_assert(stream);
+
+ set = response? m->sready : m->sresume;
+ if (!h2_ihash_get(set, stream->id)) {
+ h2_ihash_add(set, stream);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream still open",
task->id);
- if (h2_stream_is_suspended(stream)) {
- /* more data will not arrive, resume the stream */
- h2_ihash_add(m->sresume, stream);
- have_out_data_for(m, stream->id);
- }
+ /* more data will not arrive, resume the stream */
+ have_out_data_for(m, stream, 0);
}
else {
/* stream no longer active, was it placed in hold? */
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_mplx(%ld-%d): on_resume",
m->id, stream->id);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task && task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
h2_stream_set_suspended(stream, 0);
status = on_resume(on_ctx, stream->id);
}
return status;
}
-static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
-{
- h2_mplx *m = ctx;
- apr_status_t status;
- h2_stream *stream;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream = h2_ihash_get(m->streams, beam->id);
- if (stream && h2_stream_is_suspended(stream)) {
- h2_ihash_add(m->sresume, stream);
- h2_beam_on_produced(beam, NULL, NULL);
- have_out_data_for(m, beam->id);
- }
- leave_mutex(m, acquired);
- }
-}
-
apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
stream = h2_ihash_get(m->streams, stream_id);
- if (stream) {
+ if (stream && !h2_ihash_get(m->sresume, stream->id)) {
+ /* not marked for resume again already */
h2_stream_set_suspended(stream, 1);
task = h2_ihash_get(m->tasks, stream->id);
if (stream->started && (!task || task->worker_done)) {
h2_ihash_add(m->sresume, stream);
}
- else if (task->output.beam) {
- /* register callback so that we can resume on new output */
- h2_beam_on_produced(task->output.beam, output_produced, m);
- }
}
leave_mutex(m, acquired);
}
return status;
}
+
+int h2_mplx_is_busy(h2_mplx *m)
+{
+ apr_status_t status;
+ int acquired, busy = 1;
+
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ if (h2_ihash_empty(m->streams)) {
+ busy = 0;
+ }
+ if (h2_iq_empty(m->q) && h2_ihash_empty(m->tasks)) {
+ busy = 0;
+ }
+ leave_mutex(m, acquired);
+ }
+ return busy;
+}
*/
apr_uint32_t h2_mplx_shutdown(h2_mplx *m);
+int h2_mplx_is_busy(h2_mplx *m);
+
/*******************************************************************************
* IO lifetime of streams.
******************************************************************************/
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
- AP_DEBUG_ASSERT(!h2_stream_is_suspended(stream));
-
status = h2_stream_out_prepare(stream, &nread, &eos);
if (nread) {
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
case APR_SUCCESS:
break;
+ case APR_ECONNABORTED:
case APR_ECONNRESET:
return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
- stream->id, stream->rst_error);
+ stream->id, H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
case APR_EAGAIN:
/* If there is no data available, our session will automatically
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_stream(%ld-%d): on_resume", session->id, stream_id);
if (stream) {
- int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ int rv;
+ if (stream->rst_error) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+ "h2_stream(%ld-%d): RST_STREAM, err=%d",
+ session->id, stream->id, stream->rst_error);
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, stream->rst_error);
+ }
+ else {
+ rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ }
session->have_written = 1;
ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
APLOG_ERR : APLOG_DEBUG, 0, session->c,
if (!stream) {
return APR_NOTFOUND;
}
- else if (!stream->response) {
+ else if (stream->rst_error || !stream->response) {
int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
return rstatus;
}
-static int unsubmitted_iter(void *ctx, void *val)
-{
- h2_stream *stream = val;
- if (h2_stream_needs_submit(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-static int has_unsubmitted_streams(h2_session *session)
-{
- int has_unsubmitted = 0;
- h2_ihash_iter(session->streams, unsubmitted_iter, &has_unsubmitted);
- return has_unsubmitted;
-}
-
-static int suspended_iter(void *ctx, void *val)
-{
- h2_stream *stream = val;
- if (h2_stream_is_suspended(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-static int has_suspended_streams(h2_session *session)
-{
- int has_suspended = 0;
- h2_ihash_iter(session->streams, suspended_iter, &has_suspended);
- return has_suspended;
-}
-
static const char *StateNames[] = {
"INIT", /* H2_SESSION_ST_INIT */
"DONE", /* H2_SESSION_ST_DONE */
session->id, session->open_streams);
h2_conn_io_flush(&session->io);
if (session->open_streams > 0) {
- if (has_unsubmitted_streams(session)
- || has_suspended_streams(session)) {
+ if (h2_mplx_is_busy(session->mplx)) {
/* waiting for at least one stream to produce data */
transit(session, "no io", H2_SESSION_ST_WAIT);
}
}
status = h2_beam_receive(stream->output, stream->buffer,
APR_NONBLOCK_READ, amount);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
+ "h2_stream(%ld-%d): beam_received",
+ stream->session->id, stream->id);
/* The buckets we reveive are using the stream->buffer pool as
* lifetime which is exactly what we want since this is stream->pool.
*
+++ /dev/null
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-#include <stddef.h>
-
-#include <apr_hash.h>
-#include <apr_strings.h>
-
-#include <httpd.h>
-#include <http_log.h>
-
-#include "h2_private.h"
-#include "h2_stream.h"
-#include "h2_stream_set.h"
-
-
-struct h2_stream_set {
- apr_hash_t *hash;
-};
-
-static unsigned int stream_hash(const char *key, apr_ssize_t *klen)
-{
- return (unsigned int)(*((int*)key));
-}
-
-h2_stream_set *h2_stream_set_create(apr_pool_t *pool, int max)
-{
- h2_stream_set *sp = apr_pcalloc(pool, sizeof(h2_stream_set));
- sp->hash = apr_hash_make_custom(pool, stream_hash);
-
- return sp;
-}
-
-void h2_stream_set_destroy(h2_stream_set *sp)
-{
- (void)sp;
-}
-
-h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id)
-{
- return apr_hash_get(sp->hash, &stream_id, sizeof(stream_id));
-}
-
-void h2_stream_set_add(h2_stream_set *sp, h2_stream *stream)
-{
- apr_hash_set(sp->hash, &stream->id, sizeof(stream->id), stream);
-}
-
-void h2_stream_set_remove(h2_stream_set *sp, int stream_id)
-{
- apr_hash_set(sp->hash, &stream_id, sizeof(stream_id), NULL);
-}
-
-int h2_stream_set_is_empty(h2_stream_set *sp)
-{
- return apr_hash_count(sp->hash) == 0;
-}
-
-apr_size_t h2_stream_set_size(h2_stream_set *sp)
-{
- return apr_hash_count(sp->hash);
-}
-
-typedef struct {
- h2_stream_set_iter_fn *iter;
- void *ctx;
-} iter_ctx;
-
-static int hash_iter(void *ctx, const void *key, apr_ssize_t klen,
- const void *val)
-{
- iter_ctx *ictx = ctx;
- return ictx->iter(ictx->ctx, (h2_stream*)val);
-}
-
-void h2_stream_set_iter(h2_stream_set *sp,
- h2_stream_set_iter_fn *iter, void *ctx)
-{
- iter_ctx ictx;
- ictx.iter = iter;
- ictx.ctx = ctx;
- apr_hash_do(hash_iter, &ictx, sp->hash);
-}
-
-static int unsubmitted_iter(void *ctx, h2_stream *stream)
-{
- if (h2_stream_needs_submit(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-int h2_stream_set_has_unsubmitted(h2_stream_set *sp)
-{
- int has_unsubmitted = 0;
- h2_stream_set_iter(sp, unsubmitted_iter, &has_unsubmitted);
- return has_unsubmitted;
-}
-
-static int input_open_iter(void *ctx, h2_stream *stream)
-{
- if (h2_stream_input_is_open(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-int h2_stream_set_has_open_input(h2_stream_set *sp)
-{
- int has_input_open = 0;
- h2_stream_set_iter(sp, input_open_iter, &has_input_open);
- return has_input_open;
-}
-
-static int suspended_iter(void *ctx, h2_stream *stream)
-{
- if (h2_stream_is_suspended(stream)) {
- *((int *)ctx) = 1;
- return 0;
- }
- return 1;
-}
-
-int h2_stream_set_has_suspended(h2_stream_set *sp)
-{
- int has_suspended = 0;
- h2_stream_set_iter(sp, suspended_iter, &has_suspended);
- return has_suspended;
-}
-
+++ /dev/null
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_stream_set__
-#define __mod_h2__h2_stream_set__
-
-/**
- * A set of h2_stream instances. Allows lookup by stream id
- * and other criteria.
- */
-
-typedef h2_stream *h2_stream_set_match_fn(void *ctx, h2_stream *stream);
-typedef int h2_stream_set_iter_fn(void *ctx, h2_stream *stream);
-
-typedef struct h2_stream_set h2_stream_set;
-
-
-h2_stream_set *h2_stream_set_create(apr_pool_t *pool, int max);
-
-void h2_stream_set_destroy(h2_stream_set *sp);
-
-void h2_stream_set_add(h2_stream_set *sp, h2_stream *stream);
-
-h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id);
-
-void h2_stream_set_remove(h2_stream_set *sp, int stream_id);
-
-void h2_stream_set_iter(h2_stream_set *sp,
- h2_stream_set_iter_fn *iter, void *ctx);
-
-int h2_stream_set_is_empty(h2_stream_set *sp);
-
-apr_size_t h2_stream_set_size(h2_stream_set *sp);
-
-int h2_stream_set_has_unsubmitted(h2_stream_set *sp);
-int h2_stream_set_has_open_input(h2_stream_set *sp);
-int h2_stream_set_has_suspended(h2_stream_set *sp);
-
-#endif /* defined(__mod_h2__h2_stream_set__) */
h2_task_logio_add_bytes_out(task->c, written);
}
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
+ "h2_task(%s): send_out (%ld bytes)",
+ task->id, (long)written);
+ }
return status;
}
apr_bucket_brigade* brigade)
{
h2_task *task = h2_ctx_cget_task(filter->c);
- AP_DEBUG_ASSERT(task);
- return output_write(task, filter, brigade);
+ apr_status_t status;
+
+ ap_assert(task);
+ status = output_write(task, filter, brigade);
+ if (status != APR_SUCCESS) {
+ h2_task_rst(task, H2_ERR_INTERNAL_ERROR);
+ }
+ return status;
}
static apr_status_t h2_filter_read_response(ap_filter_t* filter,
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.6.1"
+#define MOD_HTTP2_VERSION "1.6.2"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010601
+#define MOD_HTTP2_VERSION_NUM 0x010602
#endif /* mod_h2_h2_version_h */