Changes with Apache 2.4.21
+ *) mod_http2: improved event handling for suspended streams, responses
+ and window updates. [Stefan Eissing]
+
*) mod_proxy_hcheck: Provide for dynamic background health
checks on reverse proxies associated with BalancerMember
workers. [Jim Jagielski]
}
}
-static void report_consumption(h2_bucket_beam *beam)
+static void report_consumption(h2_bucket_beam *beam, int force)
{
- if (beam->consumed_fn && (beam->received_bytes != beam->reported_bytes)) {
- beam->consumed_fn(beam->consumed_ctx, beam,
- beam->received_bytes - beam->reported_bytes);
- beam->reported_bytes = beam->received_bytes;
+ if (force || beam->received_bytes != beam->reported_consumed_bytes) {
+ if (beam->consumed_fn) {
+ beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
+ - beam->reported_consumed_bytes);
+ }
+ beam->reported_consumed_bytes = beam->received_bytes;
+ }
+}
+
+static void report_production(h2_bucket_beam *beam, int force)
+{
+ if (force || beam->sent_bytes != beam->reported_produced_bytes) {
+ if (beam->produced_fn) {
+ beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
+ - beam->reported_produced_bytes);
+ }
+ beam->reported_produced_bytes = beam->sent_bytes;
}
}
beam_close(beam);
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
- report_consumption(beam);
+ report_consumption(beam, 0);
h2_blist_cleanup(&beam->purge);
h2_blist_cleanup(&beam->hold);
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
beam->aborted = 1;
- report_consumption(beam);
+ report_consumption(beam, 0);
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_reds(beam);
beam_close(beam);
- report_consumption(beam);
+ report_consumption(beam, 0);
leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
beam_close(beam);
- report_consumption(beam);
+ report_consumption(beam, 0);
while (status == APR_SUCCESS
&& (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
status = APR_ECONNABORTED;
}
else if (red_brigade) {
+ int force_report = !APR_BRIGADE_EMPTY(red_brigade);
while (!APR_BRIGADE_EMPTY(red_brigade)
&& status == APR_SUCCESS) {
bred = APR_BRIGADE_FIRST(red_brigade);
status = append_bucket(beam, bred, block, beam->red_pool, &bl);
}
+ report_production(beam, force_report);
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
}
- report_consumption(beam);
+ report_consumption(beam, 0);
leave_yellow(beam, &bl);
}
return status;
if (APR_BUCKET_IS_METADATA(bred)) {
if (APR_BUCKET_IS_EOS(bred)) {
- beam->close_sent = 1;
bgreen = apr_bucket_eos_create(bb->bucket_alloc);
+ beam->close_sent = 1;
}
else if (APR_BUCKET_IS_FLUSH(bred)) {
bgreen = apr_bucket_flush_create(bb->bucket_alloc);
}
}
}
-
- if (transferred) {
- status = APR_SUCCESS;
- }
- else if (beam->closed) {
+
+ if (beam->closed
+ && (!beam->green || APR_BRIGADE_EMPTY(beam->green))
+ && H2_BLIST_EMPTY(&beam->red)) {
+ /* beam is closed and we have nothing more to receive */
if (!beam->close_sent) {
apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
beam->close_sent = 1;
+ ++transferred;
status = APR_SUCCESS;
}
- else {
- status = APR_EOF;
- }
+ }
+
+ if (transferred) {
+ status = APR_SUCCESS;
+ }
+ else if (beam->closed) {
+ status = APR_EOF;
}
else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
status = wait_cond(beam, bl.mutex);
}
void h2_beam_on_consumed(h2_bucket_beam *beam,
- h2_beam_consumed_callback *cb, void *ctx)
+ h2_beam_io_callback *cb, void *ctx)
{
h2_beam_lock bl;
}
}
+void h2_beam_on_produced(h2_bucket_beam *beam,
+ h2_beam_io_callback *cb, void *ctx)
+{
+ h2_beam_lock bl;
+
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ beam->produced_fn = cb;
+ beam->produced_ctx = ctx;
+ leave_yellow(beam, &bl);
+ }
+}
+
void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx)
{
typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
-typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
- apr_off_t bytes);
+typedef void h2_beam_io_callback(void *ctx, h2_bucket_beam *beam,
+ apr_off_t bytes);
typedef struct h2_beam_proxy h2_beam_proxy;
typedef struct {
apr_pool_t *red_pool;
apr_size_t max_buf_size;
- apr_size_t files_beamed; /* how many file handles have been set aside */
- apr_file_t *last_beamed; /* last file beamed */
+ apr_interval_time_t timeout;
+
apr_off_t sent_bytes; /* amount of bytes send */
apr_off_t received_bytes; /* amount of bytes received */
- apr_off_t reported_bytes; /* amount of bytes reported as consumed */
- apr_size_t buckets_sent;
+
+ apr_size_t buckets_sent; /* # of beam buckets sent */
+ apr_size_t files_beamed; /* how many file handles have been set aside */
+ apr_file_t *last_beamed; /* last file beamed */
unsigned int aborted : 1;
unsigned int closed : 1;
void *m_ctx;
h2_beam_mutex_enter *m_enter;
struct apr_thread_cond_t *m_cond;
- apr_interval_time_t timeout;
- h2_beam_consumed_callback *consumed_fn;
+ apr_off_t reported_consumed_bytes; /* amount of bytes reported as consumed */
+ h2_beam_io_callback *consumed_fn;
void *consumed_ctx;
+ apr_off_t reported_produced_bytes; /* amount of bytes reported as produced */
+ h2_beam_io_callback *produced_fn;
+ void *produced_ctx;
h2_beam_can_beam_callback *can_beam_fn;
void *can_beam_ctx;
};
* Call from the red side, callbacks invoked on red side.
*/
void h2_beam_on_consumed(h2_bucket_beam *beam,
- h2_beam_consumed_callback *cb, void *ctx);
+ h2_beam_io_callback *cb, void *ctx);
+
+/**
+ * Register a callback to be invoked on the red side with the
+ * amount of bytes that have been consumed by the red side, since the
+ * last callback invocation or reset.
+ * @param beam the beam to set the callback on
+ * @param cb the callback or NULL
+ * @param ctx the context to use in callback invocation
+ *
+ * Call from the red side, callbacks invoked on red side.
+ */
+void h2_beam_on_produced(h2_bucket_beam *beam,
+ h2_beam_io_callback *cb, void *ctx);
void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx);
*pacquired = 0;
return APR_SUCCESS;
}
-
+
+ AP_DEBUG_ASSERT(m->lock);
status = apr_thread_mutex_lock(m->lock);
*pacquired = (status == APR_SUCCESS);
if (*pacquired) {
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
+ m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
m->stream_timeout = stream_timeout;
static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
- if (stream->input) {
+ if (stream->input && stream->started) {
h2_beam_send(stream->input, NULL, 0); /* trigger updates */
}
}
static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
if (task->output.beam && task->worker_started && task->assigned) {
- h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
+ /* trigger updates */
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
}
return 0;
}
if (task->output.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
}
slave = task->c;
*/
h2_iq_remove(m->q, stream->id);
h2_ihash_remove(m->sready, stream->id);
+ h2_ihash_remove(m->sresume, stream->id);
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
return 1;
}
+static int report_stream_iter(void *ctx, void *val) {
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
+ "submitted=%d, suspended=%d",
+ m->id, stream->id, stream->started, stream->scheduled,
+ stream->submitted, stream->suspended);
+ return 1;
+}
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
int i, wait_secs = 5;
+
+ if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): release_join with %d streams open, "
+ "%d streams resume, %d streams ready, %d tasks",
+ m->id, (int)h2_ihash_count(m->streams),
+ (int)h2_ihash_count(m->sresume),
+ (int)h2_ihash_count(m->sready),
+ (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->streams, report_stream_iter, m);
+ }
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
purge_streams(m);
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
- AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
if (!h2_ihash_empty(m->tasks)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy, "
"%d tasks still present",
m->id, (int)h2_ihash_count(m->tasks));
m->input_consumed_ctx = ctx;
}
-static int update_window(void *ctx, void *val)
-{
- input_consumed_signal(ctx, val);
- return 1;
-}
-
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ihash_iter(m->streams, update_window, m);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_session(%ld): windows updated", m->id);
- status = APR_SUCCESS;
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-static int stream_iter_first(void *ctx, void *val)
-{
- h2_stream **pstream = ctx;
- *pstream = val;
- return 0;
-}
-
-h2_stream *h2_mplx_next_submit(h2_mplx *m)
-{
- apr_status_t status;
- h2_stream *stream = NULL;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ihash_iter(m->sready, stream_iter_first, &stream);
- if (stream) {
- h2_task *task = h2_ihash_get(m->tasks, stream->id);
- h2_ihash_remove(m->sready, stream->id);
- if (task) {
- task->submitted = 1;
- if (task->rst_error) {
- h2_stream_rst(stream, task->rst_error);
- }
- else {
- AP_DEBUG_ASSERT(task->response);
- h2_stream_set_response(stream, task->response,
- task->output.beam);
- }
- }
- else {
- /* We have the stream ready without a task. This happens
- * when we fail streams early. A response should already
- * be present. */
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
- }
- }
- leave_mutex(m, acquired);
- }
- return stream;
-}
-
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
if (m->aborted) {
status = APR_ECONNABORTED;
}
+ else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
+ status = APR_SUCCESS;
+ }
else {
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (stream->response) {
- /* already have a respone, schedule for submit */
- h2_ihash_add(m->sready, stream);
- }
else {
- h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", 0);
h2_ihash_add(m->streams, stream);
-
- if (!m->need_registration) {
- m->need_registration = h2_iq_empty(m->q);
+ if (stream->response) {
+ /* already have a respone, schedule for submit */
+ h2_ihash_add(m->sready, stream);
}
- if (m->workers_busy < m->workers_max) {
- do_registration = m->need_registration;
+ else {
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", 0);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
+ }
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, stream->request->body);
}
- h2_iq_add(m->q, stream->id, cmp, ctx);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process, body=%d",
- m->c->id, stream->id, stream->request->body);
}
leave_mutex(m, acquired);
}
if (new_conn) {
h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
}
+ stream->started = 1;
task->worker_started = 1;
task->started_at = apr_time_now();
if (sid > m->max_stream_started) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream still open",
task->id);
+ if (h2_stream_is_suspended(stream)) {
+ /* more data will not arrive, resume the stream */
+ h2_ihash_add(m->sresume, stream);
+ have_out_data_for(m, stream->id);
+ }
}
else {
/* stream done, was it placed in hold? */
}
}
}
-
+
+/*******************************************************************************
+ * mplx master events dispatching
+ ******************************************************************************/
+
+static int update_window(void *ctx, void *val)
+{
+ input_consumed_signal(ctx, val);
+ return 1;
+}
+
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
+ stream_ev_callback *on_resume,
+ stream_ev_callback *on_response,
+ void *on_ctx)
+{
+ apr_status_t status;
+ int acquired;
+ int streams[32];
+ h2_stream *stream;
+ h2_task *task;
+ size_t i, n;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld): dispatch events", m->id);
+
+ /* update input windows for streams */
+ h2_ihash_iter(m->streams, update_window, m);
+
+ if (on_response && !h2_ihash_empty(m->sready)) {
+ n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_response",
+ m->id, stream->id);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task) {
+ task->submitted = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
+ else {
+ AP_DEBUG_ASSERT(task->response);
+ h2_stream_set_response(stream, task->response, task->output.beam);
+ }
+ }
+ else {
+ /* We have the stream ready without a task. This happens
+ * when we fail streams early. A response should already
+ * be present. */
+ AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ }
+ status = on_response(on_ctx, stream->id);
+ }
+ }
+
+ if (on_resume && !h2_ihash_empty(m->sresume)) {
+ n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_resume",
+ m->id, stream->id);
+ h2_stream_set_suspended(stream, 0);
+ status = on_resume(on_ctx, stream->id);
+ }
+ }
+
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
+
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+ h2_mplx *m = ctx;
+ apr_status_t status;
+ h2_stream *stream;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, beam->id);
+ if (stream && h2_stream_is_suspended(stream)) {
+ h2_ihash_add(m->sresume, stream);
+ h2_beam_on_produced(beam, NULL, NULL);
+ have_out_data_for(m, beam->id);
+ }
+ leave_mutex(m, acquired);
+ }
+}
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
+{
+ apr_status_t status;
+ h2_stream *stream;
+ h2_task *task;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, stream_id);
+ if (stream) {
+ h2_stream_set_suspended(stream, 1);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (stream->started && (!task || task->worker_done)) {
+ h2_ihash_add(m->sresume, stream);
+ }
+ else {
+ /* register callback so that we can resume on new output */
+ h2_beam_on_produced(task->output.beam, output_produced, m);
+ }
+ }
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
unsigned int need_registration : 1;
struct h2_ihash_t *streams; /* all streams currently processing */
- struct h2_ihash_t *sready; /* all streams ready for response */
struct h2_ihash_t *shold; /* all streams done with task ongoing */
struct h2_ihash_t *spurge; /* all streams done, ready for destroy */
+
struct h2_iqueue *q; /* all stream ids that need to be started */
+ struct h2_ihash_t *sready; /* all streams ready for response */
+ struct h2_ihash_t *sresume; /* all streams that can be resumed */
struct h2_ihash_t *tasks; /* all tasks started and not destroyed */
struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */
*/
void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
-/*******************************************************************************
- * Input handling of streams.
- ******************************************************************************/
+
+typedef apr_status_t stream_ev_callback(void *ctx, int stream_id);
/**
- * Invoke the consumed callback for all streams that had bytes read since the
- * last call to this function. If no stream had input data consumed, the
- * callback is not invoked.
- * The consumed callback may also be invoked at other times whenever
- * the need arises.
+ * Dispatch events for the master connection, such as
+ * - resume: new output data has arrived for a suspended stream
+ * - response: the response for a stream is ready
*/
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
+ stream_ev_callback *on_resume,
+ stream_ev_callback *on_response,
+ void *ctx);
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id);
/*******************************************************************************
* Output handling of streams.
******************************************************************************/
-/**
- * Get a stream whose response is ready for submit. Will set response and
- * any out data available in stream.
- * @param m the mplxer to get a response from
- * @param bb the brigade to place any existing repsonse body data into
- */
-struct h2_stream *h2_mplx_next_submit(h2_mplx *m);
-
/**
* Opens the output for the given stream with the specified response.
*/
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): EOS bucket cleanup -> done",
+ session->id, stream->id);
+ h2_ihash_remove(session->streams, stream->id);
+ h2_mplx_stream_done(session->mplx, stream);
+
+ dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+ return APR_SUCCESS;
+}
+
typedef struct stream_sel_ctx {
h2_session *session;
h2_stream *candidate;
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on, req);
- ++session->unanswered_streams;
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
h2_ihash_add(session->streams, stream);
return status;
}
-typedef struct {
- h2_session *session;
- int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, void *val)
-{
- h2_stream *stream = val;
- resume_ctx *rctx = (resume_ctx*)ctx;
- h2_session *session = rctx->session;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
-
- if (h2_stream_is_suspended(stream)) {
- apr_status_t status;
- apr_off_t len = -1;
- int eos;
-
- status = h2_stream_out_prepare(stream, &len, &eos);
- if (status == APR_SUCCESS) {
- int rv;
- h2_stream_set_suspended(stream, 0);
- ++rctx->resume_count;
-
- rv = nghttp2_session_resume_data(session->ngh2, stream->id);
- ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
- APLOG_ERR : APLOG_DEBUG, 0, session->c,
- APLOGNO(02936)
- "h2_stream(%ld-%d): resuming %s, len=%ld, eos=%d",
- session->id, stream->id,
- rv? nghttp2_strerror(rv) : "", (long)len, eos);
- }
- }
- return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
- if (session->open_streams && !session->mplx->aborted) {
- resume_ctx ctx;
- ctx.session = session;
- ctx.resume_count = 0;
-
- /* Resume all streams where we have data in the out queue and
- * which had been suspended before. */
- h2_ihash_iter(session->streams, resume_on_data, &ctx);
- return ctx.resume_count;
- }
- return 0;
-}
-
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
* it. Remember at our h2_stream that we need to do this.
*/
nread = 0;
- h2_stream_set_suspended(stream, 1);
+ h2_mplx_suspend_stream(session->mplx, stream->id);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
"h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
size_t offset;
} nvctx_t;
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
-{
- apr_status_t status = APR_SUCCESS;
- h2_response *response = h2_stream_get_response(stream);
- int rv = 0;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(response || stream->rst_error);
-
- if (stream->submitted) {
- rv = NGHTTP2_PROTOCOL_ERROR;
- }
- else if (response && response->headers) {
- nghttp2_data_provider provider, *pprovider = NULL;
- h2_ngheader *ngh;
- const h2_priority *prio;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
- "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
- session->id, stream->id, response->http_status,
- (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
-
- if (response->content_length != 0) {
- memset(&provider, 0, sizeof(provider));
- provider.source.fd = stream->id;
- provider.read_callback = stream_data_cb;
- pprovider = &provider;
- }
-
- /* If this stream is not a pushed one itself,
- * and HTTP/2 server push is enabled here,
- * and the response is in the range 200-299 *),
- * and the remote side has pushing enabled,
- * -> find and perform any pushes on this stream
- * *before* we submit the stream response itself.
- * This helps clients avoid opening new streams on Link
- * headers that get pushed right afterwards.
- *
- * *) the response code is relevant, as we do not want to
- * make pushes on 401 or 403 codes, neiterh on 301/302
- * and friends. And if we see a 304, we do not push either
- * as the client, having this resource in its cache, might
- * also have the pushed ones as well.
- */
- if (stream->request && !stream->request->initiated_on
- && H2_HTTP_2XX(response->http_status)
- && h2_session_push_enabled(session)) {
-
- h2_stream_submit_pushes(stream);
- }
-
- prio = h2_stream_get_priority(stream);
- if (prio) {
- h2_session_set_prio(session, stream, prio);
- /* no showstopper if that fails for some reason */
- }
-
- ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
- response->headers);
- rv = nghttp2_submit_response(session->ngh2, response->stream_id,
- ngh->nv, ngh->nvlen, pprovider);
- }
- else {
- int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
- "h2_stream(%ld-%d): RST_STREAM, err=%d",
- session->id, stream->id, err);
-
- rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, err);
- }
-
- stream->submitted = 1;
- --session->unanswered_streams;
- if (stream->request && stream->request->initiated_on) {
- ++session->pushes_submitted;
- }
- else {
- ++session->responses_submitted;
- }
-
- if (nghttp2_is_fatal(rv)) {
- status = APR_EGENERAL;
- dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
- APLOGNO(02940) "submit_response: %s",
- nghttp2_strerror(rv));
- }
-
- return status;
-}
-
struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
h2_push *push)
{
return status;
}
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
-{
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): EOS bucket cleanup -> done",
- session->id, stream->id);
- h2_ihash_remove(session->streams, stream->id);
- --session->unanswered_streams;
- h2_mplx_stream_done(session->mplx, stream);
-
- dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
- return APR_SUCCESS;
-}
-
int h2_session_push_enabled(h2_session *session)
{
/* iff we can and they can and want */
if (socket) {
apr_socket_timeout_set(socket, saved_timeout);
}
+ session->have_written = 1;
if (rv != 0) {
if (nghttp2_is_fatal(rv)) {
dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
return APR_SUCCESS;
}
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+ if (stream) {
+ int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ session->have_written = 1;
+ ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+ APLOG_ERR : APLOG_DEBUG, 0, session->c,
+ APLOGNO(02936)
+ "h2_stream(%ld-%d): resuming %s",
+ session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+ }
+ return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+ h2_response *response;
+ int rv = 0;
+
+ AP_DEBUG_ASSERT(session);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_response", session->id, stream_id);
+ if (!stream) {
+ return APR_NOTFOUND;
+ }
+
+ response = h2_stream_get_response(stream);
+ AP_DEBUG_ASSERT(response || stream->rst_error);
+
+ if (stream->submitted) {
+ rv = NGHTTP2_PROTOCOL_ERROR;
+ }
+ else if (response && response->headers) {
+ nghttp2_data_provider provider, *pprovider = NULL;
+ h2_ngheader *ngh;
+ const h2_priority *prio;
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+ "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+ session->id, stream->id, response->http_status,
+ (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+
+ if (response->content_length != 0) {
+ memset(&provider, 0, sizeof(provider));
+ provider.source.fd = stream->id;
+ provider.read_callback = stream_data_cb;
+ pprovider = &provider;
+ }
+
+ /* If this stream is not a pushed one itself,
+ * and HTTP/2 server push is enabled here,
+ * and the response is in the range 200-299 *),
+ * and the remote side has pushing enabled,
+ * -> find and perform any pushes on this stream
+ * *before* we submit the stream response itself.
+ * This helps clients avoid opening new streams on Link
+ * headers that get pushed right afterwards.
+ *
+ * *) the response code is relevant, as we do not want to
+ * make pushes on 401 or 403 codes, neiterh on 301/302
+ * and friends. And if we see a 304, we do not push either
+ * as the client, having this resource in its cache, might
+ * also have the pushed ones as well.
+ */
+ if (stream->request && !stream->request->initiated_on
+ && H2_HTTP_2XX(response->http_status)
+ && h2_session_push_enabled(session)) {
+
+ h2_stream_submit_pushes(stream);
+ }
+
+ prio = h2_stream_get_priority(stream);
+ if (prio) {
+ h2_session_set_prio(session, stream, prio);
+ /* no showstopper if that fails for some reason */
+ }
+
+ ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
+ response->headers);
+ rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+ ngh->nv, ngh->nvlen, pprovider);
+ }
+ else {
+ int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+ "h2_stream(%ld-%d): RST_STREAM, err=%d",
+ session->id, stream->id, err);
+
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, err);
+ }
+
+ stream->submitted = 1;
+ session->have_written = 1;
+
+ if (stream->request && stream->request->initiated_on) {
+ ++session->pushes_submitted;
+ }
+ else {
+ ++session->responses_submitted;
+ }
+
+ if (nghttp2_is_fatal(rv)) {
+ status = APR_EGENERAL;
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ APLOGNO(02940) "submit_response: %s",
+ nghttp2_strerror(rv));
+ }
+
+ ++session->unsent_submits;
+
+ /* Unsent push promises are written immediately, as nghttp2
+ * 1.5.0 realizes internal stream data structures only on
+ * send and we might need them for other submits.
+ * Also, to conserve memory, we send at least every 10 submits
+ * so that nghttp2 does not buffer all outbound items too
+ * long.
+ */
+ if (status == APR_SUCCESS
+ && (session->unsent_promises || session->unsent_submits > 10)) {
+ status = h2_session_send(session);
+ }
+ return status;
+}
+
static apr_status_t h2_session_receive(void *ctx, const char *data,
apr_size_t len, apr_size_t *readlen)
{
return has_suspended;
}
-static apr_status_t h2_session_submit(h2_session *session)
-{
- apr_status_t status = APR_EAGAIN;
- h2_stream *stream;
-
- if (has_unsubmitted_streams(session)) {
- /* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx))) {
- status = submit_response(session, stream);
- ++session->unsent_submits;
-
- /* Unsent push promises are written immediately, as nghttp2
- * 1.5.0 realizes internal stream data structures only on
- * send and we might need them for other submits.
- * Also, to conserve memory, we send at least every 10 submits
- * so that nghttp2 does not buffer all outbound items too
- * long.
- */
- if (status == APR_SUCCESS
- && (session->unsent_promises || session->unsent_submits > 10)) {
- status = h2_session_send(session);
- if (status != APR_SUCCESS) {
- break;
- }
- }
- }
- }
- return status;
-}
-
static const char *StateNames[] = {
"INIT", /* H2_SESSION_ST_INIT */
"DONE", /* H2_SESSION_ST_DONE */
case H2_SESSION_ST_BUSY:
case H2_SESSION_ST_LOCAL_SHUTDOWN:
case H2_SESSION_ST_REMOTE_SHUTDOWN:
- /* nothing for input and output to do. If we remain
- * in this state, we go into a tight loop and suck up
- * CPU cycles. Ideally, we'd like to do a blocking read, but that
- * is not possible if we have scheduled tasks and wait
- * for them to produce something. */
+ /* Nothing to READ, nothing to WRITE on the master connection.
+ * Possible causes:
+ * - we wait for the client to send us sth
+ * - we wait for started tasks to produce output
+ * - we have finished all streams and the client has sent GO_AWAY
+ */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_session(%ld): NO_IO event, %d streams open",
session->id, session->open_streams);
- if (!session->open_streams) {
- if (!is_accepting_streams(session)) {
- /* We are no longer accepting new streams and have
- * finished processing existing ones. Time to leave. */
- h2_session_shutdown(session, arg, msg, 0);
- transit(session, "no io", H2_SESSION_ST_DONE);
+ if (session->open_streams > 0) {
+ if (has_unsubmitted_streams(session)
+ || has_suspended_streams(session)) {
+ /* waiting for at least one stream to produce data */
+ transit(session, "no io", H2_SESSION_ST_WAIT);
}
else {
- apr_time_t now = apr_time_now();
- /* When we have no streams, no task event are possible,
- * switch to blocking reads */
- transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
- session->idle_until = (session->remote.emitted_count?
- session->s->keep_alive_timeout :
- session->s->timeout) + now;
- session->keep_sync_until = now + apr_time_from_sec(1);
+ /* we have streams open, and all are submitted and none
+ * is suspended. The only thing keeping us from WRITEing
+ * more must be the flow control.
+ * This means we only wait for WINDOW_UPDATE from the
+ * client and can block on READ. */
+ transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+ session->idle_until = apr_time_now() + session->s->timeout;
+ session->keep_sync_until = session->idle_until;
+ /* Make sure we have flushed all previously written output
+ * so that the client will react. */
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ return;
+ }
}
}
- else if (!has_unsubmitted_streams(session)
- && !has_suspended_streams(session)) {
- transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
- session->idle_until = apr_time_now() + session->s->timeout;
- session->keep_sync_until = session->idle_until;
- /* none of our streams is waiting for a response or
- * new output data from task processing,
- * switch to blocking reads. We are probably waiting on
- * window updates. */
- if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
- return;
- }
+ else if (is_accepting_streams(session)) {
+ /* When we have no streams, but accept new, switch to idle */
+ apr_time_t now = apr_time_now();
+ transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+ session->idle_until = (session->remote.emitted_count?
+ session->s->keep_alive_timeout :
+ session->s->timeout) + now;
+ session->keep_sync_until = now + apr_time_from_sec(1);
}
else {
- /* Unable to do blocking reads, as we wait on events from
- * task processing in other threads. Do a busy wait with
- * backoff timer. */
- transit(session, "no io", H2_SESSION_ST_WAIT);
+ /* We are no longer accepting new streams and there are
+ * none left. Time to leave. */
+ h2_session_shutdown(session, arg, msg, 0);
+ transit(session, "no io", H2_SESSION_ST_DONE);
}
break;
default:
static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
{
--session->open_streams;
- if (session->open_streams <= 0) {
- }
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 0) {
{
apr_status_t status = APR_SUCCESS;
conn_rec *c = session->c;
- int rv, have_written, have_read, mpm_state;
+ int rv, mpm_state, trace = APLOGctrace3(c);
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): process start, async=%d", session->id, async);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): process start, async=%d",
+ session->id, async);
+ }
if (c->cs) {
c->cs->state = CONN_STATE_WRITE_COMPLETION;
}
while (1) {
- have_read = have_written = 0;
+ trace = APLOGctrace3(c);
+ session->have_read = session->have_written = 0;
if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
if (mpm_state == AP_MPMQ_STOPPING) {
/* make certain, we send everything before we idle */
if (!session->keep_sync_until && async && !session->open_streams
&& !session->r && session->remote.emitted_count) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
- "h2_session(%ld): async idle, nonblock read, "
- "%d streams open", session->id,
- session->open_streams);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): async idle, nonblock read, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
/* We do not return to the async mpm immediately, since under
* load, mpms show the tendency to throw keep_alive connections
* away very rapidly.
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
}
}
else {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
- "h2_session(%ld): sync idle, stutter 1-sec, "
- "%d streams open", session->id,
- session->open_streams);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): sync idle, stutter 1-sec, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
/* We wait in smaller increments, using a 1 second timeout.
* That gives us the chance to check for MPMQ_STOPPING often.
*/
h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
status = h2_session_read(session, 1);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
session->keep_sync_until = 0;
}
if (now > session->idle_until) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
- "h2_session(%ld): keepalive timeout",
- session->id);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): keepalive timeout",
+ session->id);
+ }
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
}
- else {
+ else if (trace) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
"h2_session(%ld): keepalive, %f sec left",
session->id, (session->idle_until - now) / 1000000.0f);
/* continue reading handling */
}
else {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): idle(1 sec timeout) "
- "read failed", session->id);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): idle(1 sec timeout) "
+ "read failed", session->id);
+ }
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
}
}
h2_filter_cin_timeout_set(session->cin, session->s->timeout);
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
}
}
- if (session->open_streams) {
- /* resume any streams with output data */
- h2_session_resume_streams_with_data(session);
- /* Submit any responses/push_promises that are ready */
- status = h2_session_submit(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else if (status != APR_EAGAIN) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "submit error");
- break;
- }
- /* send out window updates for our inputs */
- status = h2_mplx_in_update_windows(session->mplx);
- if (status != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR,
- "window update error");
- break;
- }
+ /* trigger window updates, stream resumes and submits */
+ status = h2_mplx_dispatch_master_events(session->mplx,
+ on_stream_resume,
+ on_stream_response,
+ session);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): dispatch error",
+ session->id);
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR,
+ "dispatch error");
+ break;
}
if (nghttp2_session_want_write(session->ngh2)) {
ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
status = h2_session_send(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else {
+ if (status != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "writing");
+ H2_ERR_INTERNAL_ERROR, "writing");
break;
}
}
- if (have_read || have_written) {
+ if (session->have_read || session->have_written) {
if (session->wait_us) {
session->wait_us = 0;
}
}
else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
/* waited long enough */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c,
- "h2_session: wait for data");
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, APR_TIMEUP, c,
+ "h2_session: wait for data");
+ }
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
break;
}
session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
}
- if (APLOGctrace1(c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
}
out:
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): [%s] process returns",
- session->id, state_name(session->state));
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): [%s] process returns",
+ session->id, state_name(session->state));
+ }
if ((session->state != H2_SESSION_ST_DONE)
&& (APR_STATUS_IS_EOF(status)
unsigned int reprioritize : 1; /* scheduled streams priority changed */
unsigned int eoc_written : 1; /* h2 eoc bucket written */
unsigned int flush : 1; /* flushing output necessary */
+ unsigned int have_read : 1; /* session has read client data */
+ unsigned int have_written : 1; /* session did write data to client */
apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */
struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */
int open_streams; /* number of streams open */
- int unanswered_streams; /* number of streams waiting for response */
int unsent_submits; /* number of submitted, but not yet written responses. */
int unsent_promises; /* number of submitted, but not yet written push promised */
APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
status = APR_SUCCESS;
}
+ else if (status == APR_EAGAIN) {
+ /* did not receive more, it's ok */
+ status = APR_SUCCESS;
+ }
+ *plen = requested;
h2_util_bb_avail(stream->buffer, plen, peos);
}
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_post");
unsigned int aborted : 1; /* was aborted */
unsigned int suspended : 1; /* DATA sending has been suspended */
unsigned int scheduled : 1; /* stream has been scheduled */
+ unsigned int started : 1; /* stream has started processing */
unsigned int submitted : 1; /* response HEADER has been sent */
apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */
return 1;
}
-static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
+ apr_bucket *first, apr_uint64_t chunk_len,
+ apr_bucket *tail)
+{
+ /* Surround the buckets [first, tail[ with new buckets carrying the
+ * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
+ * to the end of the brigade. */
+ char buffer[128];
+ apr_bucket *c;
+ int len;
+
+ len = apr_snprintf(buffer, H2_ALEN(buffer),
+ "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
+ c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
+ APR_BUCKET_INSERT_BEFORE(first, c);
+ c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
+ if (tail) {
+ APR_BUCKET_INSERT_BEFORE(tail, c);
+ }
+ else {
+ APR_BRIGADE_INSERT_TAIL(bb, c);
+ }
+}
+
+static apr_status_t input_handle_eos(h2_task *task, request_rec *r,
+ apr_bucket *b)
{
apr_status_t status = APR_SUCCESS;
apr_bucket_brigade *bb = task->input.bb;
apr_table_t *t = task->request->trailers;
if (task->input.chunked) {
+ task->input.tmp = apr_brigade_split_ex(bb, b, task->input.tmp);
if (t && !apr_is_empty_table(t)) {
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
apr_table_do(input_ser_header, task, t, NULL);
else {
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
}
+ APR_BRIGADE_CONCAT(bb, task->input.tmp);
}
else if (r && t && !apr_is_empty_table(t)){
/* trailers passed in directly. */
apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
}
task->input.eos_written = 1;
+ return status;
+}
+
+static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket_brigade *bb = task->input.bb;
+ apr_table_t *t = task->request->trailers;
+
+ if (task->input.chunked) {
+ if (t && !apr_is_empty_table(t)) {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+ apr_table_do(input_ser_header, task, t, NULL);
+ status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+ }
+ else {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+ }
+ }
+ else if (r && t && !apr_is_empty_table(t)){
+ /* trailers passed in directly. */
+ apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
+ }
APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc));
+ task->input.eos_written = 1;
return status;
}
apr_read_type_e block, apr_off_t readbytes)
{
apr_status_t status = APR_SUCCESS;
- apr_bucket *b, *next;
+ apr_bucket *b, *next, *first_data;
apr_off_t bblen = 0;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
return APR_ECONNABORTED;
}
- if (task->input.bb) {
- /* Cleanup brigades from those nasty 0 length non-meta buckets
- * that apr_brigade_split_line() sometimes produces. */
- for (b = APR_BRIGADE_FIRST(task->input.bb);
- b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
- next = APR_BUCKET_NEXT(b);
- if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
- apr_bucket_delete(b);
- }
+ if (!task->input.bb) {
+ if (!task->input.eos_written) {
+ input_append_eos(task, f->r);
}
- apr_brigade_length(task->input.bb, 0, &bblen);
+ return APR_EOF;
}
- if (bblen == 0) {
- if (task->input.eos_written) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_EOF, f->c,
- "h2_task(%s): read no data", task->id);
- return APR_EOF;
- }
- else if (task->input.eos) {
- input_append_eos(task, f->r);
- }
+ /* Cleanup brigades from those nasty 0 length non-meta buckets
+ * that apr_brigade_split_line() sometimes produces. */
+ for (b = APR_BRIGADE_FIRST(task->input.bb);
+ b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
+ apr_bucket_delete(b);
+ }
}
while (APR_BRIGADE_EMPTY(task->input.bb)) {
+ if (task->input.eos_written) {
+ return APR_EOF;
+ }
+
/* Get more input data for our request. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task(%s): get more data from mplx, block=%d, "
return status;
}
- apr_brigade_length(task->input.bb, 0, &bblen);
- if (bblen > 0 && task->input.chunked) {
- /* need to add chunks since request processing expects it */
- char buffer[128];
- apr_bucket *b;
- int len;
-
- len = apr_snprintf(buffer, H2_ALEN(buffer), "%lx\r\n",
- (unsigned long)bblen);
- b = apr_bucket_heap_create(buffer, len, NULL,
- task->input.bb->bucket_alloc);
- APR_BRIGADE_INSERT_HEAD(task->input.bb, b);
- status = apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
- }
-
- if (h2_util_has_eos(task->input.bb, -1)) {
- task->input.eos = 1;
- }
-
- if (task->input.eos && !task->input.eos_written) {
- input_append_eos(task, f->r);
+ /* Inspect the buckets received, detect EOS and apply
+ * chunked encoding if necessary */
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "input.beam recv raw", task->input.bb);
+ first_data = NULL;
+ bblen = 0;
+ for (b = APR_BRIGADE_FIRST(task->input.bb);
+ b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (first_data && task->input.chunked) {
+ make_chunk(task, task->input.bb, first_data, bblen, b);
+ first_data = NULL;
+ bblen = 0;
+ }
+ if (APR_BUCKET_IS_EOS(b)) {
+ task->input.eos = 1;
+ input_handle_eos(task, f->r, b);
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "input.bb after handle eos",
+ task->input.bb);
+ }
+ }
+ else if (b->length == 0) {
+ apr_bucket_delete(b);
+ }
+ else {
+ if (!first_data) {
+ first_data = b;
+ }
+ bblen += b->length;
+ }
}
+ if (first_data && task->input.chunked) {
+ make_chunk(task, task->input.bb, first_data, bblen, NULL);
+ }
if (h2_task_logio_add_bytes_in) {
h2_task_logio_add_bytes_in(f->c, bblen);
}
}
+ if (!task->input.eos_written && task->input.eos) {
+ input_append_eos(task, f->r);
+ }
+
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
"task_input.bb", task->input.bb);
struct {
struct h2_bucket_beam *beam;
apr_bucket_brigade *bb;
+ apr_bucket_brigade *tmp;
apr_read_type_e block;
unsigned int chunked : 1;
unsigned int eos : 1;
apr_hash_set(ih->hash, &id, sizeof(id), NULL);
}
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val)
+{
+ int id = *((int*)((char *)val + ih->ioff));
+ apr_hash_set(ih->hash, &id, sizeof(id), NULL);
+}
+
+
void h2_ihash_clear(h2_ihash_t *ih)
{
apr_hash_clear(ih->hash);
}
+typedef struct {
+ h2_ihash_t *ih;
+ void **buffer;
+ size_t max;
+ size_t len;
+} collect_ctx;
+
+static int collect_iter(void *x, void *val)
+{
+ collect_ctx *ctx = x;
+ if (ctx->len < ctx->max) {
+ ctx->buffer[ctx->len++] = val;
+ return 1;
+ }
+ return 0;
+}
+
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max)
+{
+ collect_ctx ctx;
+ size_t i;
+
+ ctx.ih = ih;
+ ctx.buffer = buffer;
+ ctx.max = max;
+ ctx.len = 0;
+ h2_ihash_iter(ih, collect_iter, &ctx);
+ for (i = 0; i < ctx.len; ++i) {
+ h2_ihash_remove_val(ih, buffer[i]);
+ }
+ return ctx.len;
+}
+
+typedef struct {
+ h2_ihash_t *ih;
+ int *buffer;
+ size_t max;
+ size_t len;
+} icollect_ctx;
+
+static int icollect_iter(void *x, void *val)
+{
+ icollect_ctx *ctx = x;
+ if (ctx->len < ctx->max) {
+ ctx->buffer[ctx->len++] = *((int*)((char *)val + ctx->ih->ioff));
+ return 1;
+ }
+ return 0;
+}
+
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max)
+{
+ icollect_ctx ctx;
+ size_t i;
+
+ ctx.ih = ih;
+ ctx.buffer = buffer;
+ ctx.max = max;
+ ctx.len = 0;
+ h2_ihash_iter(ih, icollect_iter, &ctx);
+ for (i = 0; i < ctx.len; ++i) {
+ h2_ihash_remove(ih, buffer[i]);
+ }
+ return ctx.len;
+}
+
/*******************************************************************************
* ilist - sorted list for structs with int identifier
******************************************************************************/
/* included */
}
else {
- if (maxlen == 0) {
- *pend = b;
- return status;
- }
-
if (b->length == ((apr_size_t)-1)) {
const char *ign;
apr_size_t ilen;
}
}
+ if (maxlen == 0 && b->length > 0) {
+ *pend = b;
+ return status;
+ }
+
if (same_alloc && APR_BUCKET_IS_FILE(b)) {
/* we like it move it, always */
}
return 0;
}
-int h2_util_bb_has_data(apr_bucket_brigade *bb)
-{
- apr_bucket *b;
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b))
- {
- if (!AP_BUCKET_IS_EOR(b)) {
- return 1;
- }
- }
- return 0;
-}
-
apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb,
apr_off_t *plen, int *peos)
{
void h2_ihash_add(h2_ihash_t *ih, void *val);
void h2_ihash_remove(h2_ihash_t *ih, int id);
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val);
void h2_ihash_clear(h2_ihash_t *ih);
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max);
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max);
+
/*******************************************************************************
* ilist - sorted list for structs with int identifier as first member
******************************************************************************/
* @return != 0 iff brigade holds FLUSH or EOS bucket (or both)
*/
int h2_util_has_eos(apr_bucket_brigade *bb, apr_off_t len);
-int h2_util_bb_has_data(apr_bucket_brigade *bb);
/**
* Check how many bytes of the desired amount are available and if the
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.5.5"
+#define MOD_HTTP2_VERSION "1.5.6"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010505
+#define MOD_HTTP2_VERSION_NUM 0x010506
#endif /* mod_h2_h2_version_h */
--- /dev/null
+# Microsoft Developer Studio Project File - Name="mod_proxy_http2" - Package Owner=<4>
+# Microsoft Developer Studio Generated Build File, Format Version 6.00
+# ** DO NOT EDIT **
+
+# TARGTYPE "Win32 (x86) Dynamic-Link Library" 0x0102
+
+CFG=mod_proxy_http2 - Win32 Release
+!MESSAGE This is not a valid makefile. To build this project using NMAKE,
+!MESSAGE use the Export Makefile command and run
+!MESSAGE
+!MESSAGE NMAKE /f "mod_proxy_http2.mak".
+!MESSAGE
+!MESSAGE You can specify a configuration when running NMAKE
+!MESSAGE by defining the macro CFG on the command line. For example:
+!MESSAGE
+!MESSAGE NMAKE /f "mod_proxy_http2.mak" CFG="mod_proxy_http2 - Win32 Release"
+!MESSAGE
+!MESSAGE Possible choices for configuration are:
+!MESSAGE
+!MESSAGE "mod_proxy_http2 - Win32 Release" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE "mod_proxy_http2 - Win32 Debug" (based on "Win32 (x86) Dynamic-Link Library")
+!MESSAGE
+
+# Begin Project
+# PROP AllowPerConfigDependencies 0
+# PROP Scc_ProjName ""
+# PROP Scc_LocalPath ""
+CPP=cl.exe
+MTL=midl.exe
+RSC=rc.exe
+
+!IF "$(CFG)" == "mod_proxy_http2 - Win32 Release"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 0
+# PROP BASE Output_Dir "Release"
+# PROP BASE Intermediate_Dir "Release"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 0
+# PROP Output_Dir "Release"
+# PROP Intermediate_Dir "Release"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MD /W3 /O2 /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "ssize_t=long" /FD /c
+# ADD CPP /nologo /MD /W3 /O2 /Oy- /Zi /I "../ssl" /I "../../include" /I "../../srclib/apr/include" /I "../../srclib/apr-util/include" /I "../../srclib/nghttp2/lib/includes" /D "NDEBUG" /D "WIN32" /D "_WINDOWS" /D "ssize_t=long" /Fd"Release\mod_proxy_http2_src" /FD /c
+# ADD BASE MTL /nologo /D "NDEBUG" /win32
+# ADD MTL /nologo /D "NDEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "NDEBUG"
+# ADD RSC /l 0x409 /fo"Release/mod_proxy_http2.res" /i "../../include" /i "../../srclib/apr/include" /d "NDEBUG" /d BIN_NAME="mod_proxy_http2.so" /d LONG_NAME="http2_module for Apache"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib nghttp2.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /out:".\Release\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so
+# ADD LINK32 kernel32.lib nghttp2.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /incremental:no /debug /out:".\Release\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so /opt:ref
+# Begin Special Build Tool
+TargetPath=.\Release\mod_proxy_http2.so
+SOURCE="$(InputPath)"
+PostBuild_Desc=Embed .manifest
+PostBuild_Cmds=if exist $(TargetPath).manifest mt.exe -manifest $(TargetPath).manifest -outputresource:$(TargetPath);2
+# End Special Build Tool
+
+!ELSEIF "$(CFG)" == "mod_proxy_http2 - Win32 Debug"
+
+# PROP BASE Use_MFC 0
+# PROP BASE Use_Debug_Libraries 1
+# PROP BASE Output_Dir "Debug"
+# PROP BASE Intermediate_Dir "Debug"
+# PROP BASE Target_Dir ""
+# PROP Use_MFC 0
+# PROP Use_Debug_Libraries 1
+# PROP Output_Dir "Debug"
+# PROP Intermediate_Dir "Debug"
+# PROP Ignore_Export_Lib 0
+# PROP Target_Dir ""
+# ADD BASE CPP /nologo /MDd /W3 /EHsc /Zi /Od /D "WIN32" /D "_DEBUG" /D "_WINDOWS" /D "ssize_t=long" /FD /c
+# ADD CPP /nologo /MDd /W3 /EHsc /Zi /Od /I "../ssl" /I "../../include" /I "../../srclib/apr/include" /I "../../srclib/apr-util/include" /I "../../srclib/nghttp2/lib/includes" /D "_DEBUG" /D "WIN32" /D "_WINDOWS" /D "ssize_t=long" /Fd"Debug\mod_proxy_http2_src" /FD /c
+# ADD BASE MTL /nologo /D "_DEBUG" /win32
+# ADD MTL /nologo /D "_DEBUG" /mktyplib203 /win32
+# ADD BASE RSC /l 0x409 /d "_DEBUG"
+# ADD RSC /l 0x409 /fo"Debug/mod_proxy_http2.res" /i "../../include" /i "../../srclib/apr/include" /d "_DEBUG" /d BIN_NAME="mod_proxy_http2.so" /d LONG_NAME="http2_module for Apache"
+BSC32=bscmake.exe
+# ADD BASE BSC32 /nologo
+# ADD BSC32 /nologo
+LINK32=link.exe
+# ADD BASE LINK32 kernel32.lib nghttp2d.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /incremental:no /debug /out:".\Debug\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so
+# ADD LINK32 kernel32.lib nghttp2d.lib /nologo /subsystem:windows /dll /libpath:"..\..\srclib\nghttp2\lib\MSVC_obj" /incremental:no /debug /out:".\Debug\mod_proxy_http2.so" /base:@..\..\os\win32\BaseAddr.ref,mod_proxy_http2.so
+# Begin Special Build Tool
+TargetPath=.\Debug\mod_proxy_http2.so
+SOURCE="$(InputPath)"
+PostBuild_Desc=Embed .manifest
+PostBuild_Cmds=if exist $(TargetPath).manifest mt.exe -manifest $(TargetPath).manifest -outputresource:$(TargetPath);2
+# End Special Build Tool
+
+!ENDIF
+
+# Begin Target
+
+# Name "mod_proxy_http2 - Win32 Release"
+# Name "mod_proxy_http2 - Win32 Debug"
+# Begin Source File
+
+SOURCE=./h2_proxy_session.c
+# End Source File
+# Begin Source File
+
+SOURCE=./h2_util.c
+# End Source File
+# Begin Source File
+
+SOURCE=./mod_proxy_http2.c
+# End Source File
+# Begin Source File
+
+SOURCE=..\..\build\win32\httpd.rc
+# End Source File
+# End Target
+# End Project