/* cleanup any buffered input */
status = h2_task_shutdown(task, 0);
if (status != APR_SUCCESS){
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO()
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO(03385)
"h2_task(%s): shutdown", task->id);
}
check_tx_free(m);
}
-static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error)
+static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
{
- /* Remove io from ready set, we will never submit it */
- h2_ihash_remove(m->ready_tasks, task->stream_id);
- if (task->worker_done) {
- /* already finished or not even started yet */
- h2_iq_remove(m->q, task->stream_id);
- task_destroy(m, task, 0);
- return 0;
+ h2_task *task;
+
+ h2_ihash_remove(m->streams, stream->id);
+ if (stream->input) {
+ apr_status_t status;
+ status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ);
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_stream(%ld-%d): wait on input shutdown",
+ m->id, stream->id);
+ status = h2_beam_shutdown(stream->input, APR_BLOCK_READ);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_stream(%ld-%d): input shutdown returned",
+ m->id, stream->id);
+ }
}
- else {
- /* cleanup once task is done */
- task->orphaned = 1;
- if (task->input.beam) {
- apr_status_t status;
- status = h2_beam_shutdown(task->input.beam, APR_NONBLOCK_READ);
- if (status == APR_EAGAIN) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_stream(%ld-%d): wait on input shutdown",
- m->id, task->stream_id);
- status = h2_beam_shutdown(task->input.beam, APR_BLOCK_READ);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_stream(%ld-%d): input shutdown returned",
- m->id, task->stream_id);
- }
- task->input.beam = NULL;
+
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task) {
+ /* Remove task from ready set, we will never submit it */
+ h2_ihash_remove(m->ready_tasks, stream->id);
+
+ if (task->worker_done) {
+ /* already finished or not even started yet */
+ h2_iq_remove(m->q, task->stream_id);
+ task_destroy(m, task, 0);
}
- if (rst_error) {
- h2_task_rst(task, rst_error);
+ else {
+ /* task still running, cleanup once it is done */
+ task->orphaned = 1;
+ task->input.beam = NULL;
+ if (rst_error) {
+ h2_task_rst(task, rst_error);
+ }
}
- return 1;
}
}
static int stream_done_iter(void *ctx, void *val)
{
- return task_stream_done((h2_mplx*)ctx, val, 0);
+ h2_stream *stream = val;
+ stream_done((h2_mplx*)ctx, val, 0);
+ h2_stream_destroy(stream);
+ return 0;
}
static int task_print(void *ctx, void *val)
h2_iq_clear(m->q);
apr_thread_cond_broadcast(m->task_thawed);
- while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
- /* iterate until all ios have been orphaned or destroyed */
+ while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
+ /* iterate until all streams have been removed */
}
+ AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
/* If we still have busy workers, we cannot release our memory
* pool yet, as slave connections have child pools of their respective
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
- while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) {
- /* iterate until all ios have been orphaned or destroyed */
- }
if (APR_STATUS_IS_TIMEUP(status)) {
if (i > 0) {
/* Oh, oh. Still we wait for assigned workers to report that
*/
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_task *task = h2_ihash_get(m->tasks, stream_id);
-
- h2_ihash_remove(m->streams, stream_id);
- /* there should be an h2_io, once the stream has been scheduled
- * for processing, e.g. when we received all HEADERs. But when
- * a stream is cancelled very early, it will not exist. */
- if (task) {
+ h2_stream *stream = h2_ihash_get(m->streams, stream_id);
+ if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): marking stream task as done.",
+ "h2_mplx(%ld-%d): marking stream as done.",
m->id, stream_id);
- task_stream_done(m, task, rst_error);
+ stream_done(m, stream, rst_error);
}
leave_mutex(m, acquired);
}
h2_ngn_shed_done_ngn(m->ngn_shed, task->engine);
}
- if (!task->orphaned && m->redo_tasks
+ if (!m->aborted && !task->orphaned && m->redo_tasks
&& h2_ihash_get(m->redo_tasks, task->stream_id)) {
/* reset and schedule again */
h2_task_redo(task);