static int report_consumption(h2_bucket_beam *beam)
{
int rv = 0;
- if (apr_atomic_read32(&beam->cons_ev_pending)) {
- if (beam->cons_io_cb) {
- beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
- - beam->cons_bytes_reported);
- rv = 1;
- }
- beam->cons_bytes_reported = beam->received_bytes;
- apr_atomic_set32(&beam->cons_ev_pending, 0);
+ if (beam->cons_io_cb) {
+ beam->cons_io_cb(beam->cons_ctx, beam, beam->received_bytes
+ - beam->cons_bytes_reported);
+ rv = 1;
}
+ beam->cons_bytes_reported = beam->received_bytes;
return rv;
}
}
if (transferred_buckets > 0) {
- apr_atomic_set32(&beam->cons_ev_pending, 1);
if (beam->cons_ev_cb) {
beam->cons_ev_cb(beam->cons_ctx, beam);
}
int h2_beam_report_consumption(h2_bucket_beam *beam)
{
- if (apr_atomic_read32(&beam->cons_ev_pending)) {
- h2_beam_lock bl;
- if (enter_yellow(beam, &bl) == APR_SUCCESS) {
- int rv = report_consumption(beam);
- leave_yellow(beam, &bl);
- return rv;
- }
+ h2_beam_lock bl;
+ int rv = 0;
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ rv = report_consumption(beam);
+ leave_yellow(beam, &bl);
}
- return 0;
+ return rv;
}
void h2_beam_log(h2_bucket_beam *beam, conn_rec *c, int level, const char *msg)
h2_beam_mutex_enter *m_enter;
struct apr_thread_cond_t *m_cond;
- apr_uint32_t cons_ev_pending; /* != 0, consumer event pending */
apr_off_t cons_bytes_reported; /* amount of bytes reported as consumed */
h2_beam_ev_callback *cons_ev_cb;
h2_beam_io_callback *cons_io_cb;
void *cons_ctx;
- apr_uint32_t prod_ev_pending; /* != 0, producer event pending */
apr_off_t prod_bytes_reported; /* amount of bytes reported as produced */
h2_beam_io_callback *prod_io_cb;
void *prod_ctx;
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
- h2_task *task = ctx;
+ h2_stream *stream = ctx;
+ h2_task *task = stream->task;
if (length > 0 && task && task->assigned) {
h2_req_engine_out_consumed(task->assigned, task->c, length);
}
"h2_mplx(%s): out open", stream->task->id);
}
- h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream->task);
+ h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
h2_beam_on_produced(stream->output, output_produced, m);
beamed_count = h2_beam_get_files_beamed(stream->output);
if (m->tx_handles_reserved >= beamed_count) {
s = apr_table_get(req->headers, "Content-Length");
if (!s) {
- /* no content-length given */
+ /* HTTP/2 does not need a Content-Length for framing, but our
+ * internal request processing is used to HTTP/1.1, so we
+ * need to either add a Content-Length or a Transfer-Encoding
+ * if any content can be expected. */
if (!eos) {
/* We have not seen a content-length and have no eos,
* simulate a chunked encoding for our HTTP/1.1 infrastructure,
/* start pushed stream */
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp != NULL);
- status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
+ status = h2_request_end_headers(stream->rtmp, stream->pool, 1);
if (status != APR_SUCCESS) {
return status;
}
/* request HEADER */
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp != NULL);
- status = h2_request_end_headers(stream->rtmp, stream->pool, 0);
+ status = h2_request_end_headers(stream->rtmp, stream->pool, eos);
if (status != APR_SUCCESS) {
return status;
}