}
}
+static void recv_buffer_cleanup(h2_bucket_beam *beam, h2_beam_lock *bl)
+{
+ if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+ apr_bucket_brigade *bb = beam->recv_buffer;
+ apr_off_t bblen = 0;
+
+ beam->recv_buffer = NULL;
+ apr_brigade_length(bb, 0, &bblen);
+ beam->received_bytes += bblen;
+
+ /* need to do this unlocked since bucket destroy might
+ * call this beam again. */
+ if (bl) leave_yellow(beam, bl);
+ apr_brigade_destroy(bb);
+ if (bl) enter_yellow(beam, bl);
+
+ if (beam->cons_ev_cb) {
+ beam->cons_ev_cb(beam->cons_ctx, beam);
+ }
+ }
+}
+
static apr_status_t beam_cleanup(void *data)
{
h2_bucket_beam *beam = data;
pool_kill(beam, beam->recv_pool, beam_recv_cleanup);
beam->recv_pool = NULL;
}
- if (beam->recv_buffer) {
- apr_brigade_destroy(beam->recv_buffer);
- beam->recv_buffer = NULL;
- }
+ recv_buffer_cleanup(beam, NULL);
}
else {
beam->recv_buffer = NULL;
h2_beam_lock bl;
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
- apr_brigade_cleanup(beam->recv_buffer);
- }
+ recv_buffer_cleanup(beam, &bl);
beam->aborted = 1;
beam_close(beam);
leave_yellow(beam, &bl);
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
transfer:
if (beam->aborted) {
- if (beam->recv_buffer && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
- apr_brigade_cleanup(beam->recv_buffer);
- }
+ recv_buffer_cleanup(beam, &bl);
status = APR_ECONNABORTED;
goto leave;
}
h2_stream *stream = val;
h2_task *task = stream->task;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d"),
- !!stream->task, stream->scheduled, h2_stream_is_ready(stream));
+ H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, "
+ "out_buffer=%ld"),
+ !!stream->task, stream->scheduled, h2_stream_is_ready(stream),
+ (long)h2_beam_get_buffered(stream->output));
if (task) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
H2_STRM_MSG(stream, "->03198: %s %s %s"
/* stream not cleaned up, stay around */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
H2_STRM_MSG(stream, "task_done, stream open"));
- /* more data will not arrive, resume the stream */
- check_data_for(m, stream, 0);
-
if (stream->input) {
h2_beam_leave(stream->input);
h2_beam_mutex_disable(stream->input);
if (stream->output) {
h2_beam_mutex_disable(stream->output);
}
+
+ /* more data will not arrive, resume the stream */
+ check_data_for(m, stream, 0);
}
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
H2_MPLX_ENTER(m);
scount = h2_ihash_count(m->streams);
- if (scount > 0 && m->tasks_active) {
- /* If we have streams in connection state 'IDLE', meaning
- * all streams are ready to sent data out, but lack
- * WINDOW_UPDATEs.
- *
- * This is ok, unless we have streams that still occupy
- * h2 workers. As worker threads are a scarce resource,
- * we need to take measures that we do not get DoSed.
- *
- * This is what we call an 'idle block'. Limit the amount
- * of busy workers we allow for this connection until it
- * well behaves.
- */
- now = apr_time_now();
- m->last_idle_block = now;
- if (m->limit_active > 2
- && now - m->last_limit_change >= m->limit_change_interval) {
- if (m->limit_active > 16) {
- m->limit_active = 16;
- }
- else if (m->limit_active > 8) {
- m->limit_active = 8;
- }
- else if (m->limit_active > 4) {
- m->limit_active = 4;
+ if (scount > 0) {
+ if (m->tasks_active) {
+ /* If we have streams in connection state 'IDLE', meaning
+ * all streams are ready to sent data out, but lack
+ * WINDOW_UPDATEs.
+ *
+ * This is ok, unless we have streams that still occupy
+ * h2 workers. As worker threads are a scarce resource,
+ * we need to take measures that we do not get DoSed.
+ *
+ * This is what we call an 'idle block'. Limit the amount
+ * of busy workers we allow for this connection until it
+ * well behaves.
+ */
+ now = apr_time_now();
+ m->last_idle_block = now;
+ if (m->limit_active > 2
+ && now - m->last_limit_change >= m->limit_change_interval) {
+ if (m->limit_active > 16) {
+ m->limit_active = 16;
+ }
+ else if (m->limit_active > 8) {
+ m->limit_active = 8;
+ }
+ else if (m->limit_active > 4) {
+ m->limit_active = 4;
+ }
+ else if (m->limit_active > 2) {
+ m->limit_active = 2;
+ }
+ m->last_limit_change = now;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): decrease worker limit to %d",
+ m->id, m->limit_active);
}
- else if (m->limit_active > 2) {
- m->limit_active = 2;
+
+ if (m->tasks_active > m->limit_active) {
+ status = unschedule_slow_tasks(m);
}
- m->last_limit_change = now;
+ }
+ else if (!h2_iq_empty(m->q)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): decrease worker limit to %d",
- m->id, m->limit_active);
+ "h2_mplx(%ld): idle, but %d streams to process",
+ m->id, (int)h2_iq_count(m->q));
+ status = APR_EAGAIN;
}
-
- if (m->tasks_active > m->limit_active) {
- status = unschedule_slow_tasks(m);
+ else {
+ /* idle, have streams, but no tasks active. what are we waiting for?
+ * WINDOW_UPDATEs from client? */
+ h2_stream *stream = NULL;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): idle, no tasks ongoing, %d streams",
+ m->id, (int)h2_ihash_count(m->streams));
+ h2_ihash_shift(m->streams, (void**)&stream, 1);
+ if (stream && stream->output) {
+ /* FIXME: this looks like a race between the session thinking
+ * it is idle and the EOF on a stream not being sent.
+ * Signal to caller to leave IDLE state.
+ */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "output closed=%d, mplx idle"
+ ", out has %ld bytes buffered"),
+ h2_beam_is_closed(stream->output),
+ (long)h2_beam_get_buffered(stream->output));
+ h2_ihash_add(m->streams, stream);
+ check_data_for(m, stream, 0);
+ status = APR_EAGAIN;
+ }
}
}
register_if_needed(m);
if (h2_ihash_empty(m->streams)) {
waiting = 0;
}
- if ((h2_fifo_count(m->readyq) == 0)
- && h2_iq_empty(m->q) && !m->tasks_active) {
+ else if (!m->tasks_active && !h2_fifo_count(m->readyq)
+ && h2_iq_empty(m->q)) {
waiting = 0;
}