return 0;
}
-static void have_out_data_for(h2_mplx *m, int stream_id);
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response);
static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
static void check_tx_reservation(h2_mplx *m)
m->input_consumed_ctx = ctx;
}
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+ h2_mplx *m = ctx;
+ apr_status_t status;
+ h2_stream *stream;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, beam->id);
+ if (stream) {
+ have_out_data_for(m, stream, 0);
+ }
+ leave_mutex(m, acquired);
+ }
+}
+
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem);
h2_beam_timeout_set(task->output.beam, m->stream_timeout);
h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
+ h2_beam_on_produced(task->output.beam, output_produced, m);
m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
if (!task->output.copy_files) {
h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
task->output.opened = 1;
}
- h2_ihash_add(m->sready, stream);
if (response && response->http_status < 300) {
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
check_tx_reservation(m);
}
- have_out_data_for(m, stream_id);
+ have_out_data_for(m, stream, 1);
return status;
}
APLOG_TRACE2);
}
output_consumed_signal(m, task);
- have_out_data_for(m, task->stream_id);
+ have_out_data_for(m, stream, 0);
return status;
}
return status;
}
-static void have_out_data_for(h2_mplx *m, int stream_id)
+static void have_out_data_for(h2_mplx *m, h2_stream *stream, int response)
{
- (void)stream_id;
- AP_DEBUG_ASSERT(m);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
+ h2_ihash_t *set;
+ ap_assert(m);
+ ap_assert(stream);
+
+ set = response? m->sready : m->sresume;
+ if (!h2_ihash_get(set, stream->id)) {
+ h2_ihash_add(set, stream);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
}
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream still open",
task->id);
- if (h2_stream_is_suspended(stream)) {
- /* more data will not arrive, resume the stream */
- h2_ihash_add(m->sresume, stream);
- have_out_data_for(m, stream->id);
- }
+ /* more data will not arrive, resume the stream */
+ have_out_data_for(m, stream, 0);
}
else {
/* stream no longer active, was it placed in hold? */
return status;
}
-static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
-{
- h2_mplx *m = ctx;
- apr_status_t status;
- h2_stream *stream;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- stream = h2_ihash_get(m->streams, beam->id);
- if (stream && h2_stream_is_suspended(stream)) {
- h2_ihash_add(m->sresume, stream);
- h2_beam_on_produced(beam, NULL, NULL);
- have_out_data_for(m, beam->id);
- }
- leave_mutex(m, acquired);
- }
-}
-
apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
{
apr_status_t status;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
stream = h2_ihash_get(m->streams, stream_id);
- if (stream) {
+ if (stream && !h2_ihash_get(m->sresume, stream->id)) {
+ /* not marked for resume again already */
h2_stream_set_suspended(stream, 1);
task = h2_ihash_get(m->tasks, stream->id);
if (stream->started && (!task || task->worker_done)) {
h2_ihash_add(m->sresume, stream);
}
- else if (task->output.beam) {
- /* register callback so that we can resume on new output */
- h2_beam_on_produced(task->output.beam, output_produced, m);
- }
}
leave_mutex(m, acquired);
}