}
}
+static void report_consumption(h2_bucket_beam *beam, int force)
+{
+ if (force || beam->received_bytes != beam->reported_consumed_bytes) {
+ if (beam->consumed_fn) {
+ beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
+ - beam->reported_consumed_bytes);
+ }
+ beam->reported_consumed_bytes = beam->received_bytes;
+ }
+}
+
+static void report_production(h2_bucket_beam *beam, int force)
+{
+ if (force || beam->sent_bytes != beam->reported_produced_bytes) {
+ if (beam->produced_fn) {
+ beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
+ - beam->reported_produced_bytes);
+ }
+ beam->reported_produced_bytes = beam->sent_bytes;
+ }
+}
+
static apr_off_t calc_buffered(h2_bucket_beam *beam)
{
apr_off_t len = 0;
*premain = calc_space_left(beam);
while (!beam->aborted && *premain <= 0
&& (block == APR_BLOCK_READ) && pbl->mutex) {
- apr_status_t status = wait_cond(beam, pbl->mutex);
+ apr_status_t status;
+ report_production(beam, 1);
+ status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
}
}
}
-static void report_consumption(h2_bucket_beam *beam, int force)
-{
- if (force || beam->received_bytes != beam->reported_consumed_bytes) {
- if (beam->consumed_fn) {
- beam->consumed_fn(beam->consumed_ctx, beam, beam->received_bytes
- - beam->reported_consumed_bytes);
- }
- beam->reported_consumed_bytes = beam->received_bytes;
- }
-}
-
-static void report_production(h2_bucket_beam *beam, int force)
-{
- if (force || beam->sent_bytes != beam->reported_produced_bytes) {
- if (beam->produced_fn) {
- beam->produced_fn(beam->produced_ctx, beam, beam->sent_bytes
- - beam->reported_produced_bytes);
- }
- beam->reported_produced_bytes = beam->sent_bytes;
- }
-}
-
static void h2_blist_cleanup(h2_blist *bl)
{
apr_bucket *e;
}
if (transferred) {
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
status = APR_SUCCESS;
}
else if (beam->closed) {
goto transfer;
}
else {
+ if (beam->m_cond) {
+ apr_thread_cond_broadcast(beam->m_cond);
+ }
status = APR_EAGAIN;
}
leave:
}
status = h2_beam_receive(stream->output, stream->buffer,
APR_NONBLOCK_READ, amount);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
+ "h2_stream(%ld-%d): beam_received",
+ stream->session->id, stream->id);
/* The buckets we reveive are using the stream->buffer pool as
* lifetime which is exactly what we want since this is stream->pool.
*
h2_task_logio_add_bytes_out(task->c, written);
}
}
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c,
+ "h2_task(%s): send_out (%ld bytes)",
+ task->id, (long)written);
+ }
return status;
}
apr_bucket_brigade* brigade)
{
h2_task *task = h2_ctx_cget_task(filter->c);
- AP_DEBUG_ASSERT(task);
- return output_write(task, filter, brigade);
+ apr_status_t status;
+
+ ap_assert(task);
+ status = output_write(task, filter, brigade);
+ if (status != APR_SUCCESS) {
+ h2_task_rst(task, H2_ERR_INTERNAL_ERROR);
+ }
+ return status;
}
static apr_status_t h2_filter_read_response(ap_filter_t* filter,