struct h2_task;
-typedef apr_status_t h2_io_data_cb(void *ctx,
- const char *data, apr_size_t len);
+typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_size_t len);
+
+typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx);
typedef struct h2_io h2_io;
m->bucket_alloc = apr_bucket_alloc_create(m->pool);
- m->q = h2_tq_create(m->id, m->pool);
+ m->q = h2_tq_create(m->pool, 23);
m->stream_ios = h2_io_set_create(m->pool);
m->ready_ios = h2_io_set_create(m->pool);
m->closed = h2_stream_set_create(m->pool);
}
}
-apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task)
+typedef struct {
+ h2_stream_pri_cmp *cmp;
+ void *ctx;
+} cmp_ctx;
+
+static int task_cmp(h2_task *t1, h2_task *t2, void *ctx)
+{
+ cmp_ctx *x = ctx;
+ return x->cmp(t1->stream_id, t2->stream_id, x->ctx);
+}
+
+apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
+{
+ apr_status_t status;
+
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ cmp_ctx x;
+
+ x.cmp = cmp;
+ x.ctx = ctx;
+ h2_tq_sort(m->q, task_cmp, &x);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): reprioritize tasks", m->id);
+ apr_thread_mutex_unlock(m->lock);
+ }
+ workers_register(m);
+ return status;
+}
+
+apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task,
+ h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
+
AP_DEBUG_ASSERT(m);
if (m->aborted) {
return APR_ECONNABORTED;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
- /* TODO: needs to sort queue by priority */
+ cmp_ctx x;
+
+ x.cmp = cmp;
+ x.ctx = ctx;
+ h2_tq_add(m->q, task, task_cmp, &x);
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx: do task(%s)", task->id);
- h2_tq_append(m->q, task);
apr_thread_mutex_unlock(m->lock);
}
workers_register(m);
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
- task = h2_tq_pop_first(m->q);
+ task = h2_tq_shift(m->q);
if (task) {
h2_task_set_started(task);
}
int file_handles_allowed;
};
+
+
/*******************************************************************************
* Object lifecycle and information.
******************************************************************************/
******************************************************************************/
/**
- * Perform the task on the given stream.
+ * Schedule a task for execution.
+ *
+ * @param m the multiplexer
+ * @param task the task to schedule
+ * @param cmp the stream priority compare function
+ * @param ctx context data for the compare function
+ */
+apr_status_t h2_mplx_do_task(h2_mplx *m, struct h2_task *task,
+ h2_stream_pri_cmp *cmp, void *ctx);
+
+/**
+ * Stream priorities have changed, reschedule pending tasks.
+ *
+ * @param m the multiplexer
+ * @param cmp the stream priority compare function
+ * @param ctx context data for the compare function
*/
-apr_status_t h2_mplx_do_task(h2_mplx *mplx, struct h2_task *task);
+apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx);
struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more);
return NGHTTP2_ERR_INVALID_STREAM_ID;
}
+/**
+ * Determine the importance of streams when scheduling tasks.
+ * - if both stream depend on the same one, compare weights
+ * - if one stream is closer to the root, prioritize that one
+ * - if both are on the same level, use the weight of their root
+ * level ancestors
+ */
+static int spri_cmp(int sid1, nghttp2_stream *s1,
+ int sid2, nghttp2_stream *s2, h2_session *session)
+{
+ nghttp2_stream *p1, *p2;
+
+ p1 = nghttp2_stream_get_parent(s1);
+ p2 = nghttp2_stream_get_parent(s2);
+
+ if (p1 == p2) {
+ int32_t w1, w2;
+
+ w1 = nghttp2_stream_get_weight(s1);
+ w2 = nghttp2_stream_get_weight(s2);
+ return w2 - w1;
+ }
+ else if (!p1) {
+ /* stream 1 closer to root */
+ return -1;
+ }
+ else if (!p2) {
+ /* stream 2 closer to root */
+ return 1;
+ }
+ return spri_cmp(sid1, p1, sid2, p2, session);
+}
+
+static int stream_pri_cmp(int sid1, int sid2, void *ctx)
+{
+ h2_session *session = ctx;
+ nghttp2_stream *s1, *s2;
+
+ s1 = nghttp2_session_find_stream(session->ngh2, sid1);
+ s2 = nghttp2_session_find_stream(session->ngh2, sid2);
+
+ if (s1 == s2) {
+ return 0;
+ }
+ else if (!s1) {
+ return 1;
+ }
+ else if (!s2) {
+ return -1;
+ }
+ return spri_cmp(sid1, s1, sid2, s2, session);
+}
+
static apr_status_t stream_end_headers(h2_session *session,
h2_stream *stream, int eos)
{
(void)session;
- return h2_stream_write_eoh(stream, eos);
+ return h2_stream_schedule(stream, eos, stream_pri_cmp, session);
}
static apr_status_t send_data(h2_session *session, const char *data,
break;
}
case NGHTTP2_PRIORITY: {
+ session->reprioritize = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
"h2_session: stream(%ld-%d): PRIORITY frame "
" weight=%d, dependsOn=%d, exclusive=%d",
AP_DEBUG_ASSERT(session);
+ if (session->reprioritize) {
+ h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session);
+ }
+
/* Check that any pending window updates are sent. */
status = h2_session_update_windows(session);
if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
* of 'h2c', NULL otherwise */
int aborted; /* this session is being aborted */
int flush; /* if != 0, flush output on next occasion */
+ int reprioritize; /* scheduled streams priority needs to
+ * be re-evaluated */
apr_size_t frames_received; /* number of http/2 frames received */
apr_size_t max_stream_count; /* max number of open streams */
apr_size_t max_stream_mem; /* max buffer memory for a single stream */
return status;
}
-apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos)
+apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
+ h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
AP_DEBUG_ASSERT(stream);
status = h2_request_end_headers(stream->request,
stream->m, stream->task, eos);
if (status == APR_SUCCESS) {
- status = h2_mplx_do_task(stream->m, stream->task);
+ status = h2_mplx_do_task(stream->m, stream->task, cmp, ctx);
}
if (eos) {
status = h2_stream_write_eos(stream);
const char *name, size_t nlen,
const char *value, size_t vlen);
-apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos);
+apr_status_t h2_stream_schedule(h2_stream *stream, int eos,
+ h2_stream_pri_cmp *cmp, void *ctx);
apr_status_t h2_stream_write_data(h2_stream *stream,
const char *data, size_t len);
return NULL;
}
- APR_RING_ELEM_INIT(task, link);
-
task->id = apr_psprintf(stream_pool, "%ld-%d", session_id, stream_id);
task->stream_id = stream_id;
task->mplx = mplx;
typedef struct h2_task h2_task;
struct h2_task {
- /** Links to the rest of the tasks */
- APR_RING_ENTRY(h2_task) link;
-
const char *id;
int stream_id;
struct h2_mplx *mplx;
struct apr_thread_cond_t *io; /* used to wait for events on */
};
-/**
- * The magic pointer value that indicates the head of a h2_task list
- * @param b The task list
- * @return The magic pointer value
- */
-#define H2_TASK_LIST_SENTINEL(b) APR_RING_SENTINEL((b), h2_task, link)
-
-/**
- * Determine if the task list is empty
- * @param b The list to check
- * @return true or false
- */
-#define H2_TASK_LIST_EMPTY(b) APR_RING_EMPTY((b), h2_task, link)
-
-/**
- * Return the first task in a list
- * @param b The list to query
- * @return The first task in the list
- */
-#define H2_TASK_LIST_FIRST(b) APR_RING_FIRST(b)
-
-/**
- * Return the last task in a list
- * @param b The list to query
- * @return The last task int he list
- */
-#define H2_TASK_LIST_LAST(b) APR_RING_LAST(b)
-
-/**
- * Insert a single task at the front of a list
- * @param b The list to add to
- * @param e The task to insert
- */
-#define H2_TASK_LIST_INSERT_HEAD(b, e) do { \
- h2_task *ap__b = (e); \
- APR_RING_INSERT_HEAD((b), ap__b, h2_task, link); \
-} while (0)
-
-/**
- * Insert a single task at the end of a list
- * @param b The list to add to
- * @param e The task to insert
- */
-#define H2_TASK_LIST_INSERT_TAIL(b, e) do { \
- h2_task *ap__b = (e); \
- APR_RING_INSERT_TAIL((b), ap__b, h2_task, link); \
-} while (0)
-
-/**
- * Get the next task in the list
- * @param e The current task
- * @return The next task
- */
-#define H2_TASK_NEXT(e) APR_RING_NEXT((e), link)
-/**
- * Get the previous task in the list
- * @param e The current task
- * @return The previous task
- */
-#define H2_TASK_PREV(e) APR_RING_PREV((e), link)
-
-/**
- * Remove a task from its list
- * @param e The task to remove
- */
-#define H2_TASK_REMOVE(e) APR_RING_REMOVE((e), link)
-
-
h2_task *h2_task_create(long session_id, int stream_id,
apr_pool_t *pool, struct h2_mplx *mplx,
conn_rec *c);
#include "h2_task_queue.h"
-h2_task_queue *h2_tq_create(long id, apr_pool_t *pool)
+static void grow(h2_task_queue *q, int nlen);
+static h2_task *qrm(h2_task_queue *q, int index);
+static void tqsort(h2_task_queue *q, int left, int right,
+ h2_tq_cmp *cmp, void *ctx);
+
+h2_task_queue *h2_tq_create(apr_pool_t *pool, int capacity)
{
h2_task_queue *q = apr_pcalloc(pool, sizeof(h2_task_queue));
if (q) {
- q->id = id;
- APR_RING_ELEM_INIT(q, link);
- APR_RING_INIT(&q->tasks, h2_task, link);
+ q->pool = pool;
+ grow(q, capacity);
+ q->nelts = 0;
}
return q;
}
void h2_tq_destroy(h2_task_queue *q)
{
- while (!H2_TASK_LIST_EMPTY(&q->tasks)) {
- h2_task *task = H2_TASK_LIST_FIRST(&q->tasks);
- H2_TASK_REMOVE(task);
+}
+
+int h2_tq_empty(h2_task_queue *q)
+{
+ return q->nelts == 0;
+}
+
+void h2_tq_add(h2_task_queue *q, struct h2_task *task,
+ h2_tq_cmp *cmp, void *ctx)
+{
+ int i;
+
+ if (q->nelts >= q->nalloc) {
+ grow(q, q->nalloc * 2);
}
+
+ /* Assume tasks most commonly arrive in ascending order */
+ for (i = q->nelts; i > 0; --i) {
+ if (cmp(q->elts[i-1], task, ctx) <= 0) {
+ if (i < q->nelts) {
+ memmove(q->elts+i+1, q->elts+i, q->nelts - i);
+ }
+ q->elts[i] = task;
+ q->nelts++;
+ return;
+ }
+ }
+ /* insert at front */
+ if (q->nelts) {
+ memmove(q->elts+1, q->elts, q->nelts);
+ }
+ q->elts[q->nelts++] = task;
}
-static int in_list(h2_task_queue *q, h2_task *task)
+void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx)
+{
+ tqsort(q, 0, q->nelts - 1, cmp, ctx);
+}
+
+
+apr_status_t h2_tq_remove(h2_task_queue *q, struct h2_task *task)
{
- h2_task *e;
- for (e = H2_TASK_LIST_FIRST(&q->tasks);
- e != H2_TASK_LIST_SENTINEL(&q->tasks);
- e = H2_TASK_NEXT(e)) {
- if (e == task) {
- return 1;
+ int i;
+
+ for (i = 0; i < q->nelts; ++i) {
+ if (task == q->elts[i]) {
+ qrm(q, i);
+ return APR_SUCCESS;
}
}
- return 0;
+ return APR_NOTFOUND;
}
-int h2_tq_empty(h2_task_queue *q)
+h2_task *h2_tq_shift(h2_task_queue *q)
{
- return H2_TASK_LIST_EMPTY(&q->tasks);
+ return qrm(q, 0);
}
-void h2_tq_append(h2_task_queue *q, struct h2_task *task)
+static void grow(h2_task_queue *q, int nlen)
{
- H2_TASK_LIST_INSERT_TAIL(&q->tasks, task);
+ AP_DEBUG_ASSERT(q->nalloc <= nlen);
+ if (nlen > q->nalloc) {
+ h2_task **nq = apr_pcalloc(q->pool, sizeof(h2_task *) * nlen);
+ if (q->nelts > 0) {
+ memmove(nq, q->elts, sizeof(h2_task *) * q->nelts);
+ }
+ q->elts = nq;
+ q->nalloc = nlen;
+ }
}
-apr_status_t h2_tq_remove(h2_task_queue *q, struct h2_task *task)
+static h2_task *qrm(h2_task_queue *q, int index)
{
- if (in_list(q, task)) {
- H2_TASK_REMOVE(task);
- return APR_SUCCESS;
+ if (index >= q->nelts) {
+ return NULL;
}
- return APR_NOTFOUND;
+ else if (index == q->nelts - 1) {
+ q->nelts--;
+ return q->elts[index];
+ }
+ else {
+ h2_task *t = q->elts[index];
+ q->nelts--;
+ memmove(q->elts+index, q->elts+index+1,
+ sizeof(q->elts[0]) * (q->nelts - index));
+ return t;
+ }
+}
+
+static void tqswap(h2_task_queue *q, int i, int j)
+{
+ h2_task *t = q->elts[i];
+ q->elts[i] = q->elts[j];
+ q->elts[j] = t;
}
-h2_task *h2_tq_pop_first(h2_task_queue *q)
+static void tqsort(h2_task_queue *q, int left, int right,
+ h2_tq_cmp *cmp, void *ctx)
{
- if (!H2_TASK_LIST_EMPTY(&q->tasks)) {
- h2_task *task = H2_TASK_LIST_FIRST(&q->tasks);
- H2_TASK_REMOVE(task);
- return task;
+ int i, last;
+
+ if (left >= right)
+ return;
+ tqswap(q, left, (left + right)/2);
+ last = left;
+ for (i = left+1; i <= right; i++) {
+ if ((*cmp)(q->elts[i], q->elts[left], ctx) < 0) {
+ tqswap(q, ++last, i);
+ }
}
- return NULL;
+ tqswap(q, left, last);
+ tqsort(q, left, last-1, cmp, ctx);
+ tqsort(q, last+1, right, cmp, ctx);
}
+
typedef struct h2_task_queue h2_task_queue;
struct h2_task_queue {
- APR_RING_ENTRY(h2_task_queue) link;
- APR_RING_HEAD(h2_tasks, h2_task) tasks;
- long id;
+ apr_pool_t *pool;
+ struct h2_task **elts;
+ int nelts;
+ int nalloc;
};
/**
* @param id the identifier of the queue
* @param pool the memory pool
*/
-h2_task_queue *h2_tq_create(long id, apr_pool_t *pool);
+h2_task_queue *h2_tq_create(apr_pool_t *pool, int capacity);
/**
* Release all queue tasks.
*/
int h2_tq_empty(h2_task_queue *q);
+typedef int h2_tq_cmp(struct h2_task *t1, struct h2_task *t2, void *ctx);
+
/**
- * Append the task to the end of the queue.
+ * Add the task to the sorted queue. For optimiztation, it is assumed
+ * that the order of the existing tasks has not changed.
+ *
* @param q the queue to append the task to
- * @param task the task to append
- */
-void h2_tq_append(h2_task_queue *q, struct h2_task *task);
+ * @param task the task to add
+ * @param cmp the compare function for sorting
+ * @param ctx user data for the compare function
+ */
+void h2_tq_add(h2_task_queue *q, struct h2_task *task,
+ h2_tq_cmp *cmp, void *ctx);
+
+/**
+ * Sort the tasks queue again. Useful to call if the task order
+ * has changed.
+ *
+ * @param q the queue to sort
+ * @param cmp the compare function for sorting
+ * @param ctx user data for the compare function
+ */
+void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx);
/**
* Remove a task from the queue. Return APR_SUCCESS if the task
apr_status_t h2_tq_remove(h2_task_queue *q, struct h2_task *task);
/**
- * Get the first task from the queue or NULL if the queue is empty. The
- * task will be removed.
- * @param q the queue to pop the first task from
- */
-h2_task *h2_tq_pop_first(h2_task_queue *q);
-
-/*******************************************************************************
- * Queue Manipulation.
- ******************************************************************************/
-
-/**
- * The magic pointer value that indicates the head of a h2_task_queue list
- * @param b The queue list
- * @return The magic pointer value
- */
-#define H2_TQ_LIST_SENTINEL(b) APR_RING_SENTINEL((b), h2_task_queue, link)
-
-/**
- * Determine if the queue list is empty
- * @param b The list to check
- * @return true or false
- */
-#define H2_TQ_LIST_EMPTY(b) APR_RING_EMPTY((b), h2_task_queue, link)
-
-/**
- * Return the first queue in a list
- * @param b The list to query
- * @return The first queue in the list
- */
-#define H2_TQ_LIST_FIRST(b) APR_RING_FIRST(b)
-
-/**
- * Return the last queue in a list
- * @param b The list to query
- * @return The last queue int he list
- */
-#define H2_TQ_LIST_LAST(b) APR_RING_LAST(b)
-
-/**
- * Insert a single queue at the front of a list
- * @param b The list to add to
- * @param e The queue to insert
- */
-#define H2_TQ_LIST_INSERT_HEAD(b, e) do { \
-h2_task_queue *ap__b = (e); \
-APR_RING_INSERT_HEAD((b), ap__b, h2_task_queue, link); \
-} while (0)
-
-/**
- * Insert a single queue at the end of a list
- * @param b The list to add to
- * @param e The queue to insert
- */
-#define H2_TQ_LIST_INSERT_TAIL(b, e) do { \
-h2_task_queue *ap__b = (e); \
-APR_RING_INSERT_TAIL((b), ap__b, h2_task_queue, link); \
-} while (0)
-
-/**
- * Get the next queue in the list
- * @param e The current queue
- * @return The next queue
+ * Get the first task from the queue or NULL if the queue is empty.
+ * The task will be removed.
+ * @param q the queue to get the first task from
*/
-#define H2_TQ_NEXT(e) APR_RING_NEXT((e), link)
-/**
- * Get the previous queue in the list
- * @param e The current queue
- * @return The previous queue
- */
-#define H2_TQ_PREV(e) APR_RING_PREV((e), link)
-
-/**
- * Remove a queue from its list
- * @param e The queue to remove
- */
-#define H2_TQ_REMOVE(e) APR_RING_REMOVE((e), link)
-
-
-#define H2_TQ_EMPTY(e) H2_TASK_LIST_EMPTY(&(e)->tasks)
+h2_task *h2_tq_shift(h2_task_queue *q);
#endif /* defined(__mod_h2__h2_task_queue__) */
* @macro
* Version number of the h2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.0.2-DEV"
+#define MOD_HTTP2_VERSION "1.0.3-DEV"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010002
+#define MOD_HTTP2_VERSION_NUM 0x010003
#endif /* mod_h2_h2_version_h */