return rv;\
} } while(0)
-#define H2_MPLX_ENTER_ALWAYS(m) \
- apr_thread_mutex_lock(m->lock)
-
#define H2_MPLX_LEAVE(m) \
apr_thread_mutex_unlock(m->lock)
+#define H2_MPLX_ENTER_ALWAYS(m) \
+ apr_thread_mutex_lock(m->lock)
+
+#define H2_MPLX_ENTER_MAYBE(m, lock) \
+ if (lock) apr_thread_mutex_lock(m->lock)
-static void check_data_for(h2_mplx *m, int stream_id);
+#define H2_MPLX_LEAVE_MAYBE(m, lock) \
+ if (lock) apr_thread_mutex_unlock(m->lock)
+
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
h2_stream_cleanup(stream);
h2_iq_remove(m->q, stream->id);
+ h2_fifo_remove(m->readyq, stream);
h2_ihash_remove(m->streams, stream->id);
h2_ihash_add(m->shold, stream);
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
- m->readyq = h2_iq_create(m->pool, m->max_streams);
+
+ status = h2_fifo_set_create(&m->readyq, m->pool, m->max_streams);
+ if (status != APR_SUCCESS) {
+ apr_pool_destroy(m->pool);
+ return NULL;
+ }
m->workers = workers;
m->max_active = workers->max_workers;
static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
{
- h2_mplx *m = ctx;
+ h2_stream *stream = ctx;
+ h2_mplx *m = stream->session->mplx;
- H2_MPLX_ENTER_ALWAYS(m);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_mplx(%ld-%d): output_produced", m->c->id, beam->id);
- check_data_for(m, beam->id);
-
- H2_MPLX_LEAVE(m);
+ check_data_for(m, stream, 1);
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
}
h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream);
- h2_beam_on_produced(stream->output, output_produced, m);
+ h2_beam_on_produced(stream->output, output_produced, stream);
if (stream->task->output.copy_files) {
h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL);
}
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
- check_data_for(m, stream->id);
+ check_data_for(m, stream, 0);
return status;
}
status = h2_beam_close(task->output.beam);
h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
output_consumed_signal(m, task);
- check_data_for(m, task->stream_id);
+ check_data_for(m, stream, 0);
return status;
}
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (apr_atomic_read32(&m->event_pending) > 0) {
+ else if (h2_mplx_has_master_events(m)) {
status = APR_SUCCESS;
}
else {
return status;
}
-static void check_data_for(h2_mplx *m, int stream_id)
+static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
{
- ap_assert(m);
- h2_iq_append(m->readyq, stream_id);
- apr_atomic_set32(&m->event_pending, 1);
- if (m->added_output) {
- apr_thread_cond_signal(m->added_output);
+ if (h2_fifo_push(m->readyq, stream) == APR_SUCCESS) {
+ apr_atomic_set32(&m->event_pending, 1);
+ H2_MPLX_ENTER_MAYBE(m, lock);
+ if (m->added_output) {
+ apr_thread_cond_signal(m->added_output);
+ }
+ H2_MPLX_LEAVE_MAYBE(m, lock);
}
}
h2_ihash_add(m->streams, stream);
if (h2_stream_is_ready(stream)) {
/* already have a response */
- check_data_for(m, stream->id);
+ check_data_for(m, stream, 0);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
H2_STRM_MSG(stream, "process, add to readyq"));
}
if (stream->output) {
h2_beam_mutex_disable(stream->output);
}
- check_data_for(m, stream->id);
+ check_data_for(m, stream, 0);
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
stream_ev_callback *on_resume,
void *on_ctx)
{
- int ids[100];
h2_stream *stream;
- size_t i, n;
+ int n;
- H2_MPLX_ENTER(m);
-
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
apr_atomic_set32(&m->event_pending, 0);
- purge_streams(m);
-
+
/* update input windows for streams */
h2_ihash_iter(m->streams, report_consumption_iter, m);
- if (!h2_iq_empty(m->readyq)) {
- n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids));
- for (i = 0; i < n; ++i) {
- stream = h2_ihash_get(m->streams, ids[i]);
- if (stream) {
- H2_MPLX_LEAVE(m);
-
- on_resume(on_ctx, stream);
-
- H2_MPLX_ENTER(m);
- }
- }
+ n = h2_fifo_count(m->readyq);
+ while (n > 0
+ && (h2_fifo_try_pull(m->readyq, (void**)&stream) == APR_SUCCESS)) {
+ --n;
+ on_resume(on_ctx, stream);
}
- if (!h2_iq_empty(m->readyq)) {
- apr_atomic_set32(&m->event_pending, 1);
- }
-
+
+ H2_MPLX_ENTER(m);
+ purge_streams(m);
H2_MPLX_LEAVE(m);
+
return APR_SUCCESS;
}
-apr_status_t h2_mplx_keep_active(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
- H2_MPLX_ENTER(m);
-
- check_data_for(m, stream_id);
-
- H2_MPLX_LEAVE(m);
+ check_data_for(m, stream, 1);
return APR_SUCCESS;
}
if (h2_ihash_empty(m->streams)) {
waiting = 0;
}
- if (h2_iq_empty(m->readyq) && h2_iq_empty(m->q) && !m->tasks_active) {
+ if ((h2_fifo_count(m->readyq) == 0)
+ && h2_iq_empty(m->q) && !m->tasks_active) {
waiting = 0;
}
}
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx)
{
int i;
if (h2_iq_contains(q, sid)) {
- return;
+ return 0;
}
if (q->nelts >= q->nalloc) {
iq_grow(q, q->nalloc * 2);
/* bubble it to the front of the queue */
iq_bubble_up(q, i, q->head, cmp, ctx);
}
+ return 1;
}
-void h2_iq_append(h2_iqueue *q, int sid)
+int h2_iq_append(h2_iqueue *q, int sid)
{
- h2_iq_add(q, sid, NULL, NULL);
+ return h2_iq_add(q, sid, NULL, NULL);
}
int h2_iq_remove(h2_iqueue *q, int sid)
struct h2_fifo {
void **elems;
int nelems;
+ int set;
int head;
int count;
int aborted;
return APR_SUCCESS;
}
-apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+static int index_of(h2_fifo *fifo, void *elem)
+{
+ int i;
+
+ for (i = 0; i < fifo->count; ++i) {
+ if (elem == fifo->elems[nth_index(fifo, i)]) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+static apr_status_t create_int(h2_fifo **pfifo, apr_pool_t *pool,
+ int capacity, int as_set)
{
apr_status_t rv;
h2_fifo *fifo;
return APR_ENOMEM;
}
fifo->nelems = capacity;
+ fifo->set = as_set;
*pfifo = fifo;
apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
return APR_SUCCESS;
}
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 0);
+}
+
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ return create_int(pfifo, pool, capacity, 1);
+}
+
apr_status_t h2_fifo_term(h2_fifo *fifo)
{
apr_status_t rv;
}
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
- if (fifo->count == fifo->nelems) {
+ if (fifo->set && index_of(fifo, elem) >= 0) {
+ /* set mode, elem already member */
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EEXIST;
+ }
+ else if (fifo->count == fifo->nelems) {
if (block) {
while (fifo->count == fifo->nelems) {
if (fifo->aborted) {
* @param q the queue to append the id to
* @param sid the stream id to add
* @param cmp the comparator for sorting
- * @param ctx user data for comparator
+ * @param ctx user data for comparator
+ * @return != 0 iff id was not already there
*/
-void h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+int h2_iq_add(h2_iqueue *q, int sid, h2_iq_cmp *cmp, void *ctx);
/**
* Append the id to the queue if not already present.
*
* @param q the queue to append the id to
* @param sid the id to append
+ * @return != 0 iff id was not already there
*/
-void h2_iq_append(h2_iqueue *q, int sid);
+int h2_iq_append(h2_iqueue *q, int sid);
/**
* Remove the stream id from the queue. Return != 0 iff task
*/
typedef struct h2_fifo h2_fifo;
+/**
+ * Create a FIFO queue that can hold up to capacity elements. Elements can
+ * appear several times.
+ */
apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
+/**
+ * Create a FIFO set that can hold up to capacity elements. Elements only
+ * appear once. Pushing an element already present does not change the
+ * queue and is successful.
+ */
+apr_status_t h2_fifo_set_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+
apr_status_t h2_fifo_term(h2_fifo *fifo);
apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
int h2_fifo_count(h2_fifo *fifo);
+/**
+ * Push en element into the queue. Blocks if there is no capacity left.
+ *
+ * @param fifo the FIFO queue
+ * @param elem the element to push
+ * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue,
+ * APR_EEXIST when in set mode and elem already there.
+ */
apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);