r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
report_consumption(beam, 0);
+ while (!H2_BPROXY_LIST_EMPTY(&beam->proxies)) {
+ h2_beam_proxy *proxy = H2_BPROXY_LIST_FIRST(&beam->proxies);
+ H2_BPROXY_REMOVE(proxy);
+ proxy->beam = NULL;
+ proxy->bred = NULL;
+ }
h2_blist_cleanup(&beam->purge);
h2_blist_cleanup(&beam->hold);
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}
-apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block)
+apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block,
+ int clear_buffers)
{
apr_status_t status;
h2_beam_lock bl;
if ((status = enter_yellow(beam, &bl)) == APR_SUCCESS) {
- r_purge_reds(beam);
- h2_blist_cleanup(&beam->red);
+ if (clear_buffers) {
+ r_purge_reds(beam);
+ h2_blist_cleanup(&beam->red);
+ }
beam_close(beam);
- report_consumption(beam, 0);
while (status == APR_SUCCESS
&& (!H2_BPROXY_LIST_EMPTY(&beam->proxies)
apr_status_t h2_beam_close(h2_bucket_beam *beam);
/**
- * Empty any buffered data and return APR_SUCCESS when all buckets
- * in transit have been handled. When called with APR_BLOCK_READ and
- * with a mutex installed, will wait until this is the case. Otherwise
- * APR_EAGAIN is returned.
+ * Return APR_SUCCESS when all buckets in transit have been handled.
+ * When called with APR_BLOCK_READ and a mutex set, will wait until the green
+ * side has consumed all data. Otherwise APR_EAGAIN is returned.
+ * With clear_buffers set, any queued data is discarded.
* If a timeout is set on the beam, waiting might also time out and
* return APR_ETIMEUP.
*
* Call from the red side only.
*/
-apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block);
+apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block,
+ int clear_buffers);
void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
h2_ihash_remove(m->spurge, stream->id);
h2_stream_destroy(stream);
if (task) {
- task_destroy(m, task, 0);
+ task_destroy(m, task, 1);
}
return 0;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_task(%s): destroy", task->id);
- /* cleanup any buffered input */
- if (task->input.beam) {
- status = h2_beam_shutdown(task->input.beam, APR_NONBLOCK_READ);
- if (status != APR_SUCCESS){
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO(03385)
- "h2_task(%s): input shutdown", task->id);
- }
- }
-
if (called_from_master) {
/* Process outstanding events before destruction */
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
h2_beam_on_produced(task->output.beam, NULL, NULL);
+ status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1);
+ if (status != APR_SUCCESS){
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c,
+ APLOGNO(03385) "h2_task(%s): output shutdown "
+ "incomplete", task->id);
+ }
}
slave = task->c;
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
+ h2_beam_on_consumed(stream->input, NULL, NULL);
+ h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
}
h2_stream_cleanup(stream);
"h2_mplx(%ld-%d): marking stream as done.",
m->id, stream->id);
stream_done(m, stream, stream->rst_error);
+ purge_streams(m);
leave_mutex(m, acquired);
}
return status;
status = APR_SUCCESS;
}
else {
+ purge_streams(m);
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
if (APLOGctrace2(m->c)) {
}
}
else {
- /* stream done, was it placed in hold? */
+ /* stream no longer active, was it placed in hold? */
stream = h2_ihash_get(m->shold, task->stream_id);
if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream in hold",
task->id);
- stream->response = NULL; /* ref from task memory */
/* We cannot destroy the stream here since this is
* called from a worker thread and freeing memory pools
* is only safe in the only thread using it (and its
* parent pool / allocator) */
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
- task_destroy(m, task, 0);
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
}
if (stream->input) {
apr_status_t status;
- status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
+ status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ, 1);
if (status == APR_EAGAIN) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
"h2_stream(%ld-%d): wait on input shutdown",
stream->session->id, stream->id);
- status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
+ status = h2_beam_shutdown(stream->input, APR_BLOCK_READ, 1);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
"h2_stream(%ld-%d): input shutdown returned",
stream->session->id, stream->id);