/* repeat until empty */
}
h2_ihash_clear(m->spurge);
- ap_assert(h2_ihash_empty(m->spurge));
}
}
if (task) {
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%s): %s %s %s"
"[orph=%d/started=%d/done=%d/frozen=%d]",
task->id, task->request->method,
task->worker_done, task->frozen);
}
else if (task) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-NULL): NULL", m->id);
}
return 1;
static int task_abort_connection(void *ctx, void *val)
{
h2_task *task = val;
- if (task->c) {
- task->c->aborted = 1;
- }
- if (task->input.beam) {
- h2_beam_abort(task->input.beam);
- }
- if (task->output.beam) {
- h2_beam_abort(task->output.beam);
+ if (!task->worker_done) {
+ if (task->c) {
+ task->c->aborted = 1;
+ }
+ if (task->input.beam) {
+ h2_beam_abort(task->input.beam);
+ }
+ if (task->output.beam) {
+ h2_beam_abort(task->output.beam);
+ }
}
return 1;
}
h2_mplx *m = ctx;
h2_stream *stream = val;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
- "ready=%d",
+ "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d",
m->id, stream->id, stream->started, stream->scheduled,
h2_stream_is_ready(stream));
return 1;
}
+static int task_done_iter(void *ctx, void *val);
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
int acquired;
+ /* How to shut down a h2 connection:
+ * 1. tell the workers that no more tasks will come from us */
h2_workers_unregister(m->workers, m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- int i, wait_secs = 5;
+ int i, wait_secs = 60;
- if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): release_join with %d streams open, "
- "%d streams ready, %d tasks",
- m->id, (int)h2_ihash_count(m->streams),
- (int)h2_ihash_count(m->sready),
- (int)h2_ihash_count(m->tasks));
- h2_ihash_iter(m->streams, report_stream_iter, m);
- }
-
- /* disable WINDOW_UPDATE callbacks */
+ /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear
+ * our TODO list and purge any streams we have collected */
h2_mplx_set_consumed_cb(m, NULL, NULL);
-
- if (!h2_ihash_empty(m->shold)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): start release_join with %d streams in hold",
- m->id, (int)h2_ihash_count(m->shold));
- }
- if (!h2_ihash_empty(m->spurge)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): start release_join with %d streams to purge",
- m->id, (int)h2_ihash_count(m->spurge));
- }
-
+ h2_mplx_abort(m);
h2_iq_clear(m->q);
+ purge_streams(m);
+
+ /* 3. mark all slave connections as aborted and wakeup all sleeping
+ * tasks. Mark all still active streams as 'done'. m->streams has to
+ * be empty afterwards with streams either in
+ * a) m->shold because a task is still active
+ * b) m->spurge because task is done, or was not started */
+ h2_ihash_iter(m->tasks, task_abort_connection, m);
apr_thread_cond_broadcast(m->task_thawed);
while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
/* iterate until all streams have been removed */
}
ap_assert(h2_ihash_empty(m->streams));
-
- if (!h2_ihash_empty(m->shold)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): 2. release_join with %d streams in "
- "hold, %d workers busy, %d tasks",
- m->id, (int)h2_ihash_count(m->shold),
- m->workers_busy,
- (int)h2_ihash_count(m->tasks));
- }
- if (!h2_ihash_empty(m->spurge)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): 2. release_join with %d streams to purge",
- m->id, (int)h2_ihash_count(m->spurge));
- }
+
+ /* 4. purge all streams we collected by marking them 'done' */
+ purge_streams(m);
- /* If we still have busy workers, we cannot release our memory
- * pool yet, as tasks have references to us.
- * Any operation on the task slave connection will from now on
- * be errored ECONNRESET/ABORTED, so processing them should fail
- * and workers *should* return in a timely fashion.
- */
+ /* 5. while workers are busy on this connection, meaning they
+ * are processing tasks from this connection, wait on them finishing
+ * to wake us and check again. Eventually, this has to succeed. */
+ m->join_wait = wait;
for (i = 0; m->workers_busy > 0; ++i) {
- h2_ihash_iter(m->tasks, task_abort_connection, m);
-
- m->join_wait = wait;
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) {
- if (i > 0) {
- /* Oh, oh. Still we wait for assigned workers to report that
- * they are done. Unless we have a bug, a worker seems to be hanging.
- * If we exit now, all will be deallocated and the worker, once
- * it does return, will walk all over freed memory...
- */
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
- "h2_mplx(%ld): release, waiting for %d seconds now for "
- "%d h2_workers to return, have still %d tasks outstanding",
- m->id, i*wait_secs, m->workers_busy,
- (int)h2_ihash_count(m->tasks));
- if (i == 1) {
- h2_ihash_iter(m->tasks, task_print, m);
- }
- }
- h2_mplx_abort(m);
- apr_thread_cond_broadcast(m->task_thawed);
+ /* This can happen if we have very long running requests
+ * that do not time out on IO. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
+ "h2_mplx(%ld): release, waiting for %d seconds now for "
+ "%d h2_workers to return, have still %d tasks outstanding",
+ m->id, i*wait_secs, m->workers_busy,
+ (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->shold, report_stream_iter, m);
+ h2_ihash_iter(m->tasks, task_print, m);
}
+ purge_streams(m);
}
+ m->join_wait = NULL;
- if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ /* 6. All workers for this connection are done, we are in
+ * single-threaded processing now effectively. */
+ leave_mutex(m, acquired);
+
+ if (!h2_ihash_empty(m->tasks)) {
+ /* when we are here, we lost track of the tasks still present.
+ * this currently happens with mod_proxy_http2 when we shut
+ * down a h2_req_engine with tasks assigned... */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): 3. release_join with %d tasks",
m->id, (int)h2_ihash_count(m->tasks));
h2_ihash_iter(m->tasks, task_print, m);
+
+ while (!h2_ihash_iter(m->tasks, task_done_iter, m)) {
+ /* iterate until all tasks have been removed */
+ }
}
+
+ /* 7. With all tasks done, the stream hold should be empty and all
+ * remaining streams are ready for purging */
ap_assert(h2_ihash_empty(m->shold));
- if (!h2_ihash_empty(m->spurge)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): 3. release_join %d streams to purge",
- m->id, (int)h2_ihash_count(m->spurge));
- purge_streams(m);
- }
+ purge_streams(m);
- if (!h2_ihash_empty(m->tasks)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
- "h2_mplx(%ld): release_join -> destroy, "
- "%d tasks still present",
- m->id, (int)h2_ihash_count(m->tasks));
- }
- leave_mutex(m, acquired);
+ /* 8. close the h2_req_enginge shed and self destruct */
+ h2_ngn_shed_destroy(m->ngn_shed);
+ m->ngn_shed = NULL;
h2_mplx_destroy(m);
- /* all gone */
}
return status;
}
stream = h2_ihash_get(m->shold, task->stream_id);
if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): task_done, stream in hold",
- task->id);
+ "h2_mplx(%s): task_done, stream %d in hold",
+ task->id, stream->id);
/* We cannot destroy the stream here since this is
* called from a worker thread and freeing memory pools
* is only safe in the only thread using it (and its
task->id);
task_destroy(m, task, 0);
}
-
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
}
}
}
+static int task_done_iter(void *ctx, void *val)
+{
+ task_done((h2_mplx*)ctx, val, 0);
+ return 0;
+}
+
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
task_done(m, task, NULL);
--m->workers_busy;
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
if (ptask) {
/* caller wants another task */
*ptask = next_stream_task(m);