return 0;
}
-static void purge_streams(h2_mplx *m)
+static void purge_streams(h2_mplx *m, int lock)
{
if (!h2_ihash_empty(m->spurge)) {
+ H2_MPLX_ENTER_MAYBE(m, lock);
while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) {
/* repeat until empty */
}
+ H2_MPLX_LEAVE_MAYBE(m, lock);
}
}
status = APR_SUCCESS;
}
else {
- purge_streams(m);
+ purge_streams(m, 0);
h2_ihash_iter(m->streams, report_consumption_iter, m);
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
}
stream = h2_ihash_get(m->streams, task->stream_id);
- if (stream && !m->aborted && h2_ihash_get(m->sredo, stream->id)) {
- /* reset and schedule again */
- h2_task_redo(task);
- h2_ihash_remove(m->sredo, stream->id);
- h2_iq_add(m->q, stream->id, NULL, NULL);
- return;
- }
-
if (stream) {
- /* stream not cleaned up, stay around */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- H2_STRM_MSG(stream, "task_done, stream open"));
- /* more data will not arrive, resume the stream */
- if (stream->input) {
- h2_beam_mutex_disable(stream->input);
- h2_beam_leave(stream->input);
+ /* stream not done yet. */
+ if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) {
+ /* reset and schedule again */
+ h2_task_redo(task);
+ h2_ihash_remove(m->sredo, stream->id);
+ h2_iq_add(m->q, stream->id, NULL, NULL);
}
- if (stream->output) {
- h2_beam_mutex_disable(stream->output);
+ else {
+ /* stream not cleaned up, stay around */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ H2_STRM_MSG(stream, "task_done, stream open"));
+ /* more data will not arrive, resume the stream */
+ check_data_for(m, stream, 0);
+
+ if (stream->input) {
+ h2_beam_leave(stream->input);
+ h2_beam_mutex_disable(stream->input);
+ }
+ if (stream->output) {
+ h2_beam_mutex_disable(stream->output);
+ }
}
- check_data_for(m, stream, 0);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
+ /* stream is done, was just waiting for this. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
H2_STRM_MSG(stream, "task_done, in hold"));
- /* stream was just waiting for us. */
if (stream->input) {
- h2_beam_mutex_disable(stream->input);
h2_beam_leave(stream->input);
+ h2_beam_mutex_disable(stream->input);
}
if (stream->output) {
h2_beam_mutex_disable(stream->output);
task_done(m, task, NULL);
--m->tasks_active;
+
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
&& !h2_ihash_get(m->sredo, stream->id)) {
h2_ihash_add(m->sredo, stream);
}
+
if (task->engine) {
/* cannot report that as done until engine returns */
}
apr_atomic_set32(&m->event_pending, 0);
/* update input windows for streams */
- h2_ihash_iter(m->streams, report_consumption_iter, m);
+ h2_ihash_iter(m->streams, report_consumption_iter, m);
+ purge_streams(m, 1);
n = h2_fifo_count(m->readyq);
while (n > 0
on_resume(on_ctx, stream);
}
- H2_MPLX_ENTER(m);
- purge_streams(m);
- H2_MPLX_LEAVE(m);
-
return APR_SUCCESS;
}
return fifo_push(fifo, elem, 0);
}
+static void *pull_head(h2_fifo *fifo)
+{
+ void *elem;
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ return elem;
+}
+
static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
{
apr_status_t rv;
}
ap_assert(fifo->count > 0);
- *pelem = fifo->elems[fifo->head];
- --fifo->count;
- if (fifo->count > 0) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count+1 == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- }
+ *pelem = pull_head(fifo);
+
apr_thread_mutex_unlock(fifo->lock);
}
return rv;
}
ap_assert(fifo->count > 0);
- elem = fifo->elems[fifo->head];
+ elem = pull_head(fifo);
+ apr_thread_mutex_unlock(fifo->lock);
+
switch (fn(elem, ctx)) {
case H2_FIFO_OP_PULL:
- --fifo->count;
- if (fifo->count > 0) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count+1 == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- }
break;
case H2_FIFO_OP_REPUSH:
- if (fifo->count > 1) {
- fifo->head = nth_index(fifo, 1);
- if (fifo->count < fifo->nelems) {
- fifo->elems[nth_index(fifo, fifo->count-1)] = elem;
- }
- }
+ return h2_fifo_push(fifo, elem);
break;
}
- apr_thread_mutex_unlock(fifo->lock);
}
return rv;
}