* This allow recursive entering of the mutex from the saem thread,
* which is what we need in certain situations involving callbacks
*/
- AP_DEBUG_ASSERT(m);
+ ap_assert(m);
apr_threadkey_private_get(&mutex, thread_lock);
if (mutex == m->lock) {
*pacquired = 0;
return APR_SUCCESS;
}
- AP_DEBUG_ASSERT(m->lock);
+ ap_assert(m->lock);
status = apr_thread_mutex_lock(m->lock);
*pacquired = (status == APR_SUCCESS);
if (*pacquired) {
/* repeat until empty */
}
h2_ihash_clear(m->spurge);
- AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
}
}
static void h2_mplx_destroy(h2_mplx *m)
{
- AP_DEBUG_ASSERT(m);
+ ap_assert(m);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): destroy, tasks=%d",
m->id, (int)h2_ihash_count(m->tasks));
apr_status_t status = APR_SUCCESS;
apr_allocator_t *allocator = NULL;
h2_mplx *m;
- AP_DEBUG_ASSERT(conf);
+ ap_assert(conf);
status = apr_allocator_create(&allocator);
if (status != APR_SUCCESS) {
{
conn_rec *slave = NULL;
int reuse_slave = 0;
- apr_status_t status;
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_task(%s): destroy", task->id);
}
}
- /* The pool is cleared/destroyed which also closes all
- * allocated file handles. Give this count back to our
- * file handle pool. */
if (task->output.beam) {
- m->tx_handles_reserved +=
- h2_beam_get_files_beamed(task->output.beam);
h2_beam_on_produced(task->output.beam, NULL, NULL);
- status = h2_beam_shutdown(task->output.beam, APR_NONBLOCK_READ, 1);
- if (status != APR_SUCCESS){
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c,
- APLOGNO(03385) "h2_task(%s): output shutdown "
- "incomplete, beam empty=%d, holds proxies=%d",
- task->id,
- h2_beam_empty(task->output.beam),
- h2_beam_holds_proxies(task->output.beam));
- }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ APLOGNO(03385) "h2_task(%s): destroy "
+ "output beam empty=%d, holds proxies=%d",
+ task->id,
+ h2_beam_empty(task->output.beam),
+ h2_beam_holds_proxies(task->output.beam));
}
slave = task->c;
/* Remove mutex after, so that abort still finds cond to signal */
h2_beam_mutex_set(stream->input, NULL, NULL, NULL);
}
+ if (stream->output) {
+ m->tx_handles_reserved += h2_beam_get_files_beamed(stream->output);
+ }
h2_stream_cleanup(stream);
task = h2_ihash_get(m->tasks, stream->id);
if (task) {
h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%s): %s %s %s"
"[orph=%d/started=%d/done=%d/frozen=%d]",
task->id, task->request->method,
task->worker_done, task->frozen);
}
else if (task) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id);
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */
"->03198: h2_stream(%ld-NULL): NULL", m->id);
}
return 1;
static int task_abort_connection(void *ctx, void *val)
{
h2_task *task = val;
- if (task->c) {
- task->c->aborted = 1;
- }
- if (task->input.beam) {
- h2_beam_abort(task->input.beam);
- }
- if (task->output.beam) {
- h2_beam_abort(task->output.beam);
+ if (!task->worker_done) {
+ if (task->c) {
+ task->c->aborted = 1;
+ }
+ if (task->input.beam) {
+ h2_beam_abort(task->input.beam);
+ }
+ if (task->output.beam) {
+ h2_beam_abort(task->output.beam);
+ }
}
return 1;
}
h2_mplx *m = ctx;
h2_stream *stream = val;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
- "ready=%d",
+ "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, ready=%d",
m->id, stream->id, stream->started, stream->scheduled,
h2_stream_is_ready(stream));
return 1;
}
+static int task_done_iter(void *ctx, void *val);
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
int acquired;
+ /* How to shut down a h2 connection:
+ * 1. tell the workers that no more tasks will come from us */
h2_workers_unregister(m->workers, m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- int i, wait_secs = 5;
+ int i, wait_secs = 60;
- if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): release_join with %d streams open, "
- "%d streams ready, %d tasks",
- m->id, (int)h2_ihash_count(m->streams),
- (int)h2_ihash_count(m->sready),
- (int)h2_ihash_count(m->tasks));
- h2_ihash_iter(m->streams, report_stream_iter, m);
- }
-
- /* disable WINDOW_UPDATE callbacks */
+ /* 2. disable WINDOW_UPDATEs and set the mplx to aborted, clear
+ * our TODO list and purge any streams we have collected */
h2_mplx_set_consumed_cb(m, NULL, NULL);
-
- if (!h2_ihash_empty(m->shold)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): start release_join with %d streams in hold",
- m->id, (int)h2_ihash_count(m->shold));
- }
- if (!h2_ihash_empty(m->spurge)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): start release_join with %d streams to purge",
- m->id, (int)h2_ihash_count(m->spurge));
- }
-
+ h2_mplx_abort(m);
h2_iq_clear(m->q);
+ purge_streams(m);
+
+ /* 3. mark all slave connections as aborted and wakeup all sleeping
+ * tasks. Mark all still active streams as 'done'. m->streams has to
+ * be empty afterwards with streams either in
+ * a) m->shold because a task is still active
+ * b) m->spurge because task is done, or was not started */
+ h2_ihash_iter(m->tasks, task_abort_connection, m);
apr_thread_cond_broadcast(m->task_thawed);
while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
/* iterate until all streams have been removed */
}
- AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
-
- if (!h2_ihash_empty(m->shold)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): 2. release_join with %d streams in "
- "hold, %d workers busy, %d tasks",
- m->id, (int)h2_ihash_count(m->shold),
- m->workers_busy,
- (int)h2_ihash_count(m->tasks));
- }
- if (!h2_ihash_empty(m->spurge)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): 2. release_join with %d streams to purge",
- m->id, (int)h2_ihash_count(m->spurge));
- }
+ ap_assert(h2_ihash_empty(m->streams));
+
+ /* 4. purge all streams we collected by marking them 'done' */
+ purge_streams(m);
- /* If we still have busy workers, we cannot release our memory
- * pool yet, as tasks have references to us.
- * Any operation on the task slave connection will from now on
- * be errored ECONNRESET/ABORTED, so processing them should fail
- * and workers *should* return in a timely fashion.
- */
+ /* 5. while workers are busy on this connection, meaning they
+ * are processing tasks from this connection, wait on them finishing
+ * to wake us and check again. Eventually, this has to succeed. */
+ m->join_wait = wait;
for (i = 0; m->workers_busy > 0; ++i) {
- h2_ihash_iter(m->tasks, task_abort_connection, m);
-
- m->join_wait = wait;
status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) {
- if (i > 0) {
- /* Oh, oh. Still we wait for assigned workers to report that
- * they are done. Unless we have a bug, a worker seems to be hanging.
- * If we exit now, all will be deallocated and the worker, once
- * it does return, will walk all over freed memory...
- */
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
- "h2_mplx(%ld): release, waiting for %d seconds now for "
- "%d h2_workers to return, have still %d tasks outstanding",
- m->id, i*wait_secs, m->workers_busy,
- (int)h2_ihash_count(m->tasks));
- if (i == 1) {
- h2_ihash_iter(m->tasks, task_print, m);
- }
- }
- h2_mplx_abort(m);
- apr_thread_cond_broadcast(m->task_thawed);
+ /* This can happen if we have very long running requests
+ * that do not time out on IO. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198)
+ "h2_mplx(%ld): release, waiting for %d seconds now for "
+ "%d h2_workers to return, have still %d tasks outstanding",
+ m->id, i*wait_secs, m->workers_busy,
+ (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->shold, report_stream_iter, m);
+ h2_ihash_iter(m->tasks, task_print, m);
}
+ purge_streams(m);
}
+ m->join_wait = NULL;
- if (!h2_ihash_empty(m->tasks) && APLOGctrace1(m->c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ /* 6. All workers for this connection are done, we are in
+ * single-threaded processing now effectively. */
+ leave_mutex(m, acquired);
+
+ if (!h2_ihash_empty(m->tasks)) {
+ /* when we are here, we lost track of the tasks still present.
+ * this currently happens with mod_proxy_http2 when we shut
+ * down a h2_req_engine with tasks assigned... */
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): 3. release_join with %d tasks",
m->id, (int)h2_ihash_count(m->tasks));
h2_ihash_iter(m->tasks, task_print, m);
+
+ while (!h2_ihash_iter(m->tasks, task_done_iter, m)) {
+ /* iterate until all tasks have been removed */
+ }
}
- AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
- if (!h2_ihash_empty(m->spurge)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld): 3. release_join %d streams to purge",
- m->id, (int)h2_ihash_count(m->spurge));
- purge_streams(m);
- }
+
+ /* 7. With all tasks done, the stream hold should be empty and all
+ * remaining streams are ready for purging */
+ ap_assert(h2_ihash_empty(m->shold));
+ purge_streams(m);
- if (!h2_ihash_empty(m->tasks)) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
- "h2_mplx(%ld): release_join -> destroy, "
- "%d tasks still present",
- m->id, (int)h2_ihash_count(m->tasks));
- }
- leave_mutex(m, acquired);
+ /* 8. close the h2_req_enginge shed and self destruct */
+ h2_ngn_shed_destroy(m->ngn_shed);
+ m->ngn_shed = NULL;
h2_mplx_destroy(m);
- /* all gone */
}
return status;
}
{
int acquired;
- AP_DEBUG_ASSERT(m);
if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) {
m->aborted = 1;
h2_ngn_shed_abort(m->ngn_shed);
apr_status_t status = APR_SUCCESS;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld-%d): marking stream as done.",
h2_stream *s = NULL;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((enter_mutex(m, &acquired)) == APR_SUCCESS) {
s = h2_ihash_get(m->streams, id);
leave_mutex(m, acquired);
h2_stream *stream;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
stream = h2_ihash_get(m->streams, beam->id);
if (stream) {
apr_status_t status;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
apr_status_t status;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
apr_status_t status;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
int do_registration = 0;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
status = APR_ECONNABORTED;
apr_status_t status;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (m->aborted) {
*has_more = 0;
stream = h2_ihash_get(m->shold, task->stream_id);
if (stream) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%s): task_done, stream in hold",
- task->id);
+ "h2_mplx(%s): task_done, stream %d in hold",
+ task->id, stream->id);
/* We cannot destroy the stream here since this is
* called from a worker thread and freeing memory pools
* is only safe in the only thread using it (and its
task->id);
task_destroy(m, task, 0);
}
-
- if (m->join_wait) {
- apr_thread_cond_signal(m->join_wait);
- }
}
}
}
+static int task_done_iter(void *ctx, void *val)
+{
+ task_done((h2_mplx*)ctx, val, 0);
+ return 0;
+}
+
void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
{
int acquired;
if (enter_mutex(m, &acquired) == APR_SUCCESS) {
task_done(m, task, NULL);
--m->workers_busy;
+ if (m->join_wait) {
+ apr_thread_cond_signal(m->join_wait);
+ }
if (ptask) {
/* caller wants another task */
*ptask = next_stream_task(m);
h2_stream *stream;
size_t i, n;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
apr_status_t status;
int acquired;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
h2_stream *s = h2_ihash_get(m->streams, stream_id);
if (s) {
apr_status_t status;
int acquired, waiting = 1;
- AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
if (h2_ihash_empty(m->streams)) {
waiting = 0;
h2_stream *stream = ctx;
apr_status_t status;
+ ap_assert(stream->can_be_cleaned);
if (stream->files) {
apr_file_t *file;
int i;
void h2_stream_cleanup(h2_stream *stream)
{
- AP_DEBUG_ASSERT(stream);
+ apr_status_t status;
+
+ ap_assert(stream);
if (stream->out_buffer) {
+ /* remove any left over output buckets that may still have
+ * references into request pools */
apr_brigade_cleanup(stream->out_buffer);
}
- if (stream->input) {
- apr_status_t status;
- status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ, 1);
- if (status == APR_EAGAIN) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
- "h2_stream(%ld-%d): wait on input shutdown",
- stream->session->id, stream->id);
- status = h2_beam_shutdown(stream->input, APR_BLOCK_READ, 1);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
- "h2_stream(%ld-%d): input shutdown returned",
- stream->session->id, stream->id);
- }
+ h2_beam_abort(stream->input);
+ status = h2_beam_wait_empty(stream->input, APR_NONBLOCK_READ);
+ if (status == APR_EAGAIN) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ "h2_stream(%ld-%d): wait on input drain",
+ stream->session->id, stream->id);
+ status = h2_beam_wait_empty(stream->input, APR_BLOCK_READ);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c,
+ "h2_stream(%ld-%d): input drain returned",
+ stream->session->id, stream->id);
}
}
void h2_stream_destroy(h2_stream *stream)
{
- AP_DEBUG_ASSERT(stream);
+ ap_assert(stream);
+ ap_assert(!h2_mplx_stream_get(stream->session->mplx, stream->id));
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c,
"h2_stream(%ld-%d): destroy",
stream->session->id, stream->id);
+ stream->can_be_cleaned = 1;
if (stream->pool) {
apr_pool_destroy(stream->pool);
}
const char *name, size_t nlen,
const char *value, size_t vlen)
{
- AP_DEBUG_ASSERT(stream);
+ ap_assert(stream);
if (!stream->has_response) {
if (name[0] == ':') {
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status = APR_EINVAL;
- AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(stream->session);
- AP_DEBUG_ASSERT(stream->session->mplx);
+ ap_assert(stream);
+ ap_assert(stream->session);
+ ap_assert(stream->session->mplx);
if (!stream->scheduled) {
if (eos) {
apr_status_t h2_stream_close_input(h2_stream *stream)
{
conn_rec *c = stream->session->c;
- apr_status_t status = APR_SUCCESS, rv;
+ apr_status_t status;
+ apr_bucket_brigade *tmp;
+ apr_bucket *b;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
"h2_stream(%ld-%d): closing input",
return APR_ECONNRESET;
}
- if (!stream->input) {
- h2_beam_create(&stream->input, stream->pool, stream->id, "input", 0);
- }
-
+ tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
if (stream->trailers && !apr_is_empty_table(stream->trailers)) {
h2_headers *r = h2_headers_create(HTTP_OK, stream->trailers,
NULL, stream->pool);
- apr_bucket *b = h2_bucket_headers_create(c->bucket_alloc, r);
- apr_bucket_brigade *tmp;
-
- tmp = apr_brigade_create(stream->pool, c->bucket_alloc);
+ b = h2_bucket_headers_create(c->bucket_alloc, r);
APR_BRIGADE_INSERT_TAIL(tmp, b);
- status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
- apr_brigade_destroy(tmp);
-
stream->trailers = NULL;
}
- close_input(stream);
- rv = h2_beam_close(stream->input);
- return status ? status : rv;
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(tmp, b);
+ status = h2_beam_send(stream->input, tmp, APR_BLOCK_READ);
+ apr_brigade_destroy(tmp);
+ return status;
}
apr_status_t h2_stream_write_data(h2_stream *stream,
apr_status_t status = APR_SUCCESS;
apr_bucket_brigade *tmp;
- AP_DEBUG_ASSERT(stream);
+ ap_assert(stream);
if (!stream->input) {
return APR_EOF;
}