/* do not count */
}
else if (APR_BUCKET_IS_FILE(b)) {
- /* if unread, has no real mem footprint. how to test? */
+ /* if unread, has no real mem footprint. */
}
else {
len += b->length;
return APR_SIZE_MAX;
}
-static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
+static int buffer_is_empty(h2_bucket_beam *beam)
{
- if (beam->timeout > 0) {
- return apr_thread_cond_timedwait(beam->cond, lock, beam->timeout);
+ return ((!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
+ && H2_BLIST_EMPTY(&beam->send_list));
+}
+
+static apr_status_t wait_empty(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_thread_mutex_t *lock)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ while (!buffer_is_empty(beam) && APR_SUCCESS == rv) {
+ if (APR_BLOCK_READ != block || !lock) {
+ rv = APR_EAGAIN;
+ }
+ else if (beam->timeout > 0) {
+ rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(beam->change, lock);
+ }
}
- else {
- return apr_thread_cond_wait(beam->cond, lock);
+ return rv;
+}
+
+static apr_status_t wait_not_empty(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_thread_mutex_t *lock)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ while (buffer_is_empty(beam) && APR_SUCCESS == rv) {
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (beam->closed) {
+ rv = APR_EOF;
+ }
+ else if (APR_BLOCK_READ != block || !lock) {
+ rv = APR_EAGAIN;
+ }
+ else if (beam->timeout > 0) {
+ rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(beam->change, lock);
+ }
}
+ return rv;
}
-static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
- h2_beam_lock *pbl, apr_size_t *premain)
+static apr_status_t wait_not_full(h2_bucket_beam *beam, apr_read_type_e block,
+ apr_size_t *pspace_left, apr_thread_mutex_t *lock)
{
- *premain = calc_space_left(beam);
- while (!beam->aborted && *premain <= 0
- && (block == APR_BLOCK_READ) && pbl->mutex) {
- apr_status_t status;
- report_prod_io(beam, 1, pbl);
- status = wait_cond(beam, pbl->mutex);
- if (APR_STATUS_IS_TIMEUP(status)) {
- return status;
+ apr_status_t rv = APR_SUCCESS;
+ apr_size_t left;
+
+ while (0 == (left = calc_space_left(beam)) && APR_SUCCESS == rv) {
+ if (beam->aborted) {
+ rv = APR_ECONNABORTED;
+ }
+ else if (block != APR_BLOCK_READ) {
+ rv = APR_EAGAIN;
+ }
+ else if (beam->timeout > 0) {
+ rv = apr_thread_cond_timedwait(beam->change, lock, beam->timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(beam->change, lock);
}
- r_purge_sent(beam);
- *premain = calc_space_left(beam);
}
- return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
+ *pspace_left = left;
+ return rv;
}
static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
if (!bl.mutex) {
r_purge_sent(beam);
}
- else if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
+ else {
+ apr_thread_cond_broadcast(beam->change);
}
leave_yellow(beam, &bl);
}
{
if (!beam->closed) {
beam->closed = 1;
- if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
- }
+ apr_thread_cond_broadcast(beam->change);
}
return APR_SUCCESS;
}
apr_interval_time_t timeout)
{
h2_bucket_beam *beam;
- apr_status_t status = APR_SUCCESS;
+ apr_status_t rv = APR_SUCCESS;
beam = apr_pcalloc(pool, sizeof(*beam));
if (!beam) {
beam->max_buf_size = max_buf_size;
beam->timeout = timeout;
- status = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT,
- pool);
- if (status == APR_SUCCESS) {
- status = apr_thread_cond_create(&beam->cond, pool);
- if (status == APR_SUCCESS) {
+ rv = apr_thread_mutex_create(&beam->lock, APR_THREAD_MUTEX_DEFAULT, pool);
+ if (APR_SUCCESS == rv) {
+ rv = apr_thread_cond_create(&beam->change, pool);
+ if (APR_SUCCESS == rv) {
apr_pool_pre_cleanup_register(pool, beam, beam_cleanup);
*pbeam = beam;
}
}
- return status;
+ return rv;
}
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
h2_blist_cleanup(&beam->send_list);
report_consumption(beam, &bl);
}
- if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
- }
+ apr_thread_cond_broadcast(beam->change);
leave_yellow(beam, &bl);
}
}
h2_beam_lock bl;
if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
- while (status == APR_SUCCESS
- && !H2_BLIST_EMPTY(&beam->send_list)
- && !H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
- if (block == APR_NONBLOCK_READ || !bl.mutex) {
- status = APR_EAGAIN;
- break;
- }
- if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
- }
- status = wait_cond(beam, bl.mutex);
- }
+ status = wait_empty(beam, block, bl.mutex);
leave_yellow(beam, &bl);
}
return status;
static apr_status_t append_bucket(h2_bucket_beam *beam,
apr_bucket *b,
apr_read_type_e block,
+ apr_size_t *pspace_left,
h2_beam_lock *pbl)
{
const char *data;
apr_size_t len;
- apr_size_t space_left = 0;
apr_status_t status;
+ int can_beam, check_len;
+
+ if (beam->aborted) {
+ return APR_ECONNABORTED;
+ }
if (APR_BUCKET_IS_METADATA(b)) {
if (APR_BUCKET_IS_EOS(b)) {
return APR_SUCCESS;
}
else if (APR_BUCKET_IS_FILE(b)) {
- /* file bucket lengths do not really count */
+ /* For file buckets the problem is their internal readpool that
+ * is used on the first read to allocate buffer/mmap.
+ * Since setting aside a file bucket will de-register the
+ * file cleanup function from the previous pool, we need to
+ * call that only from the sender thread.
+ *
+ * Currently, we do not handle file bucket with refcount > 1 as
+ * the beam is then not in complete control of the file's lifetime.
+ * Which results in the bug that a file get closed by the receiver
+ * while the sender or the beam still have buckets using it.
+ *
+ * Additionally, we allow callbacks to prevent beaming file
+ * handles across. The use case for this is to limit the number
+ * of open file handles and rather use a less efficient beam
+ * transport. */
+ apr_bucket_file *bf = b->data;
+ apr_file_t *fd = bf->fd;
+ can_beam = (bf->refcount.refcount == 1);
+ if (can_beam && beam->can_beam_fn) {
+ can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
+ }
+ check_len = !can_beam;
}
else {
- space_left = calc_space_left(beam);
- if (space_left > 0 && b->length == ((apr_size_t)-1)) {
+ if (b->length == ((apr_size_t)-1)) {
const char *data;
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status != APR_SUCCESS) {
return status;
}
}
-
- if (space_left <= 0) {
- status = r_wait_space(beam, block, pbl, &space_left);
- if (status != APR_SUCCESS) {
- return status;
- }
- if (space_left <= 0) {
- return APR_EAGAIN;
- }
- }
- /* space available, maybe need bucket split */
+ check_len = 1;
}
+ if (check_len) {
+ if (b->length > *pspace_left) {
+ apr_bucket_split(b, *pspace_left);
+ }
+ *pspace_left -= b->length;
+ }
/* The fundamental problem is that reading a sender bucket from
* a receiver thread is a total NO GO, because the bucket might use
apr_bucket_heap_make(b, data, len, NULL);
}
}
- else if (APR_BUCKET_IS_FILE(b)) {
- /* For file buckets the problem is their internal readpool that
- * is used on the first read to allocate buffer/mmap.
- * Since setting aside a file bucket will de-register the
- * file cleanup function from the previous pool, we need to
- * call that only from the sender thread.
- *
- * Currently, we do not handle file bucket with refcount > 1 as
- * the beam is then not in complete control of the file's lifetime.
- * Which results in the bug that a file get closed by the receiver
- * while the sender or the beam still have buckets using it.
- *
- * Additionally, we allow callbacks to prevent beaming file
- * handles across. The use case for this is to limit the number
- * of open file handles and rather use a less efficient beam
- * transport. */
- apr_bucket_file *bf = b->data;
- apr_file_t *fd = bf->fd;
- int can_beam = (bf->refcount.refcount == 1);
- if (can_beam && beam->can_beam_fn) {
- can_beam = beam->can_beam_fn(beam->can_beam_ctx, beam, fd);
- }
- if (can_beam) {
- status = apr_bucket_setaside(b, beam->send_pool);
- }
- /* else: enter ENOTIMPL case below */
+ else if (APR_BUCKET_IS_FILE(b) && can_beam) {
+ status = apr_bucket_setaside(b, beam->send_pool);
}
if (status == APR_ENOTIMPL) {
* a counter example).
* We do the read while in the sender thread, so that the bucket may
* use pools/allocators safely. */
- if (space_left < APR_BUCKET_BUFF_SIZE) {
- space_left = APR_BUCKET_BUFF_SIZE;
- }
- if (space_left < b->length) {
- apr_bucket_split(b, space_left);
- }
status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
if (status == APR_SUCCESS) {
status = apr_bucket_setaside(b, beam->send_pool);
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->send_list, b);
beam->sent_bytes += b->length;
-
+
return APR_SUCCESS;
}
apr_read_type_e block)
{
apr_bucket *b;
- apr_status_t status = APR_SUCCESS;
+ apr_status_t rv = APR_SUCCESS;
+ apr_size_t space_left = 0;
h2_beam_lock bl;
/* Called from the sender thread to add buckets to the beam */
if (beam->aborted) {
move_to_hold(beam, sender_bb);
- status = APR_ECONNABORTED;
+ rv = APR_ECONNABORTED;
}
else if (sender_bb) {
- int force_report = !APR_BRIGADE_EMPTY(sender_bb);
- while (!APR_BRIGADE_EMPTY(sender_bb) && status == APR_SUCCESS) {
+ int force_report = !APR_BRIGADE_EMPTY(sender_bb);
+
+ space_left = calc_space_left(beam);
+ while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
+ if (space_left <= 0) {
+ rv = wait_not_full(beam, block, &space_left, bl.mutex);
+ if (APR_SUCCESS != rv) {
+ break;
+ }
+ }
b = APR_BRIGADE_FIRST(sender_bb);
- status = append_bucket(beam, b, block, &bl);
+ rv = append_bucket(beam, b, block, &space_left, &bl);
}
+
report_prod_io(beam, force_report, &bl);
- if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
- }
+ apr_thread_cond_broadcast(beam->change);
}
report_consumption(beam, &bl);
leave_yellow(beam, &bl);
}
- return status;
+ return rv;
}
apr_status_t h2_beam_receive(h2_bucket_beam *beam,
apr_bucket *bsender, *brecv, *ng;
int transferred = 0;
apr_status_t status = APR_SUCCESS;
- apr_off_t remain = readbytes;
+ apr_off_t remain;
int transferred_buckets = 0;
/* Called from the receiver thread to take buckets from the beam */
if (enter_yellow(beam, &bl) == APR_SUCCESS) {
+ if (readbytes <= 0) {
+ readbytes = APR_SIZE_MAX;
+ }
+ remain = readbytes;
+
transfer:
if (beam->aborted) {
recv_buffer_cleanup(beam, &bl);
}
/* transfer enough buckets from our receiver brigade, if we have one */
- while (beam->recv_buffer
- && !APR_BRIGADE_EMPTY(beam->recv_buffer)
- && (readbytes <= 0 || remain >= 0)) {
+ while (remain >= 0
+ && beam->recv_buffer
+ && !APR_BRIGADE_EMPTY(beam->recv_buffer)) {
+
brecv = APR_BRIGADE_FIRST(beam->recv_buffer);
- if (readbytes > 0 && brecv->length > 0 && remain <= 0) {
+ if (brecv->length > 0 && remain <= 0) {
break;
}
APR_BUCKET_REMOVE(brecv);
/* transfer from our sender brigade, transforming sender buckets to
* receiver ones until we have enough */
- while (!H2_BLIST_EMPTY(&beam->send_list) && (readbytes <= 0 || remain >= 0)) {
- bsender = H2_BLIST_FIRST(&beam->send_list);
+ while (remain >= 0 && !H2_BLIST_EMPTY(&beam->send_list)) {
+
brecv = NULL;
-
- if (readbytes > 0 && bsender->length > 0 && remain <= 0) {
+ bsender = H2_BLIST_FIRST(&beam->send_list);
+ if (bsender->length > 0 && remain <= 0) {
break;
}
* been handed out. See also PR 59348 */
apr_bucket_file_enable_mmap(ng, 0);
#endif
- remain -= bsender->length;
- ++transferred;
APR_BUCKET_REMOVE(bsender);
H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+
+ remain -= bsender->length;
++transferred;
+ ++transferred_buckets;
continue;
}
else {
* receiver bucket references it any more. */
APR_BUCKET_REMOVE(bsender);
H2_BLIST_INSERT_TAIL(&beam->hold_list, bsender);
+
beam->received_bytes += bsender->length;
++transferred_buckets;
}
}
- if (readbytes > 0 && remain < 0) {
- /* too much, put some back */
+ if (remain < 0) {
+ /* too much, put some back into out recv_buffer */
remain = readbytes;
for (brecv = APR_BRIGADE_FIRST(bb);
brecv != APR_BRIGADE_SENTINEL(bb);
}
}
- if (transferred_buckets > 0) {
- if (beam->cons_ev_cb) {
- beam->cons_ev_cb(beam->cons_ctx, beam);
- }
- }
-
- if (beam->closed
- && (!beam->recv_buffer || APR_BRIGADE_EMPTY(beam->recv_buffer))
- && H2_BLIST_EMPTY(&beam->send_list)) {
+ if (beam->closed && buffer_is_empty(beam)) {
/* beam is closed and we have nothing more to receive */
if (!beam->close_sent) {
apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
}
}
- if (transferred) {
- if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
+ if (transferred_buckets > 0) {
+ if (beam->cons_ev_cb) {
+ beam->cons_ev_cb(beam->cons_ctx, beam);
}
- status = APR_SUCCESS;
}
- else if (beam->closed) {
- status = APR_EOF;
+
+ if (transferred) {
+ apr_thread_cond_broadcast(beam->change);
+ status = APR_SUCCESS;
}
- else if (block == APR_BLOCK_READ && bl.mutex && beam->cond) {
- status = wait_cond(beam, bl.mutex);
+ else {
+ status = wait_not_empty(beam, block, bl.mutex);
if (status != APR_SUCCESS) {
goto leave;
}
goto transfer;
}
- else {
- if (beam->cond) {
- apr_thread_cond_broadcast(beam->cond);
- }
- status = APR_EAGAIN;
- }
leave:
leave_yellow(beam, &bl);
}
return NULL;
}
-static apr_status_t add_data(h2_stream *stream, apr_off_t requested,
- apr_off_t *plen, int *peos, int *complete,
- h2_headers **pheaders)
+static apr_status_t add_buffered_data(h2_stream *stream, apr_off_t requested,
+ apr_off_t *plen, int *peos, int *is_all,
+ h2_headers **pheaders)
{
apr_bucket *b, *e;
*peos = 0;
*plen = 0;
- *complete = 0;
+ *is_all = 0;
if (pheaders) {
*pheaders = NULL;
}
- H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_data");
+ H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "add_buffered_data");
b = APR_BRIGADE_FIRST(stream->out_buffer);
while (b != APR_BRIGADE_SENTINEL(stream->out_buffer)) {
e = APR_BUCKET_NEXT(b);
}
b = e;
}
- *complete = 1;
+ *is_all = 1;
return APR_SUCCESS;
}
requested = (*plen > 0)? H2MIN(*plen, max_chunk) : max_chunk;
/* count the buffered data until eos or a headers bucket */
- status = add_data(stream, requested, plen, peos, &complete, pheaders);
+ status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
if (status == APR_EAGAIN) {
/* TODO: ugly, someone needs to retrieve the response first */
return APR_SUCCESS;
}
+ /* If there we do not have enough buffered data to satisfy the requested
+ * length *and* we counted the _complete_ buffer (and did not stop in the middle
+ * because of meta data there), lets see if we can read more from the
+ * output beam */
missing = H2MIN(requested, stream->max_mem) - *plen;
if (complete && !*peos && missing > 0) {
+ apr_status_t rv = APR_EOF;
+
if (stream->output) {
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "pre");
- status = h2_beam_receive(stream->output, stream->out_buffer,
- APR_NONBLOCK_READ,
- stream->max_mem - *plen);
+ rv = h2_beam_receive(stream->output, stream->out_buffer,
+ APR_NONBLOCK_READ, stream->max_mem - *plen);
H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "post");
}
- else {
- status = APR_EOF;
- }
- if (APR_STATUS_IS_EOF(status)) {
+ if (rv == APR_SUCCESS) {
+ /* count the buffer again, now that we have read output */
+ status = add_buffered_data(stream, requested, plen, peos, &complete, pheaders);
+ }
+ else if (APR_STATUS_IS_EOF(rv)) {
apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->out_buffer, eos);
*peos = 1;
- status = APR_SUCCESS;
}
- else if (status == APR_SUCCESS) {
- /* do it again, now that we have gotten more */
- status = add_data(stream, requested, plen, peos, &complete, pheaders);
+ else if (APR_STATUS_IS_EAGAIN(rv)) {
+ /* we set this is the status of this call only if there
+ * is no buffered data, see check below */
+ }
+ else {
+ /* real error reading. Give this back directly, even though
+ * we may have something buffered. */
+ status = rv;
}
}
apr_bucket_brigade* bb)
{
apr_bucket *b;
- apr_status_t status = APR_SUCCESS;
+ apr_status_t rv = APR_SUCCESS;
int flush = 0, blocking;
if (task->frozen) {
return APR_SUCCESS;
}
+send:
/* we send block once we opened the output, so someone is there
* reading it *and* the task is not assigned to a h2_req_engine */
blocking = (!task->assigned && task->output.opened);
- if (!task->output.opened) {
- for (b = APR_BRIGADE_FIRST(bb);
- b != APR_BRIGADE_SENTINEL(bb);
- b = APR_BUCKET_NEXT(b)) {
- if (APR_BUCKET_IS_FLUSH(b)) {
- flush = 1;
- break;
- }
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b)) {
+ if (APR_BUCKET_IS_FLUSH(b) || APR_BUCKET_IS_EOS(b) || AP_BUCKET_IS_EOR(b)) {
+ flush = 1;
+ break;
}
}
/* still have data buffered from previous attempt.
* setaside and append new data and try to pass the complete data */
if (!APR_BRIGADE_EMPTY(bb)) {
- status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+ if (APR_SUCCESS != (rv = ap_save_brigade(f, &task->output.bb, &bb, task->pool))) {
+ goto out;
+ }
}
- if (status == APR_SUCCESS) {
- status = send_out(task, task->output.bb, blocking);
- }
+ rv = send_out(task, task->output.bb, blocking);
}
else {
- /* no data buffered here, try to pass the brigade directly */
- status = send_out(task, bb, blocking);
- if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
- /* could not write all, buffer the rest */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, APLOGNO(03405)
- "h2_slave_out(%s): saving brigade",
- task->id);
- status = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
- flush = 1;
+ /* no data buffered previously, pass brigade directly */
+ rv = send_out(task, bb, blocking);
+
+ if (APR_SUCCESS == rv && !APR_BRIGADE_EMPTY(bb)) {
+ /* output refused to buffer it all, time to open? */
+ if (!task->output.opened && APR_SUCCESS == (rv = open_output(task))) {
+ /* Make another attempt to send the data. With the output open,
+ * the call might be blocking and send all data, so we do not need
+ * to save the brigade */
+ goto send;
+ }
+ else if (blocking && flush) {
+ /* Need to keep on doing this. */
+ goto send;
+ }
+
+ if (APR_SUCCESS == rv) {
+ /* could not write all, buffer the rest */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, task->c, APLOGNO(03405)
+ "h2_slave_out(%s): saving brigade", task->id);
+ ap_assert(NULL);
+ rv = ap_save_brigade(f, &task->output.bb, &bb, task->pool);
+ flush = 1;
+ }
}
}
- if (status == APR_SUCCESS && !task->output.opened && flush) {
+ if (APR_SUCCESS == rv && !task->output.opened && flush) {
/* got a flush or could not write all, time to tell someone to read */
- status = open_output(task);
+ rv = open_output(task);
}
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, task->c,
+out:
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, rv, task->c,
"h2_slave_out(%s): slave_out leave", task->id);
- return status;
+ return rv;
}
static apr_status_t output_finish(h2_task *task)