From d047dc85d7d0c00dd3d20aacb2522cb1600353c5 Mon Sep 17 00:00:00 2001 From: Stefan Eissing Date: Thu, 29 Oct 2015 12:20:53 +0000 Subject: [PATCH] inserting needed flush buckets on master connections again, rewrite task queue for better mem usage and less copying git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1711235 13f79535-47bb-0310-9956-ffa450edef68 --- modules/http2/h2_conn_io.c | 11 ++- modules/http2/h2_h2.c | 30 +++++++ modules/http2/h2_h2.h | 7 ++ modules/http2/h2_mplx.c | 6 +- modules/http2/h2_session.c | 3 +- modules/http2/h2_task_queue.c | 153 +++++++++++++++++----------------- modules/http2/h2_task_queue.h | 51 ++++++------ 7 files changed, 149 insertions(+), 112 deletions(-) diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index cf03651434..34b4c30bf8 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -190,7 +190,7 @@ apr_status_t h2_conn_io_read(h2_conn_io *io, return status; } -static apr_status_t flush_out(apr_bucket_brigade *bb, void *ctx) +static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) { h2_conn_io *io = (h2_conn_io*)ctx; apr_status_t status; @@ -274,7 +274,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, apr_size_t avail = io->bufsize - io->buflen; if (avail <= 0) { bucketeer_buffer(io); - status = flush_out(io->output, io); + status = pass_out(io->output, io); io->buflen = 0; } else if (length > avail) { @@ -293,7 +293,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, } else { - status = apr_brigade_write(io->output, flush_out, io, buf, length); + status = apr_brigade_write(io->output, pass_out, io, buf, length); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, io->connection, "h2_conn_io: write error"); @@ -315,8 +315,11 @@ apr_status_t h2_conn_io_flush(h2_conn_io *io) bucketeer_buffer(io); io->buflen = 0; } + + APR_BRIGADE_INSERT_TAIL(io->output, + apr_bucket_flush_create(io->output->bucket_alloc)); /* Send it out */ - status = flush_out(io->output, io); + status = pass_out(io->output, io); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, io->connection, diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index 156d1902b7..09cc14b488 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -64,6 +64,36 @@ static char *(*opt_ssl_var_lookup)(apr_pool_t *, server_rec *, conn_rec *, request_rec *, char *); + +/******************************************************************************* + * HTTP/2 error stuff + */ +static const char *h2_err_descr[] = { + "no error", /* 0x0 */ + "protocol error", + "internal error", + "flow control error", + "settings timeout", + "stream closed", /* 0x5 */ + "frame size error", + "refused stream", + "cancel", + "compression error", + "connect error", /* 0xa */ + "enhance your calm", + "inadequate security", + "http/1.1 required", +}; + +const char *h2_h2_err_description(int h2_error) +{ + if (h2_error >= 0 + && h2_error < (sizeof(h2_err_descr)/sizeof(h2_err_descr[0]))) { + return h2_err_descr[h2_error]; + } + return "unknown http/2 errotr code"; +} + /******************************************************************************* * Check connection security requirements of RFC 7540 */ diff --git a/modules/http2/h2_h2.h b/modules/http2/h2_h2.h index 50f2614b29..f04cc786e3 100644 --- a/modules/http2/h2_h2.h +++ b/modules/http2/h2_h2.h @@ -49,6 +49,13 @@ extern const char *H2_MAGIC_TOKEN; #define H2_ERR_INADEQUATE_SECURITY (0x0c) #define H2_ERR_HTTP_1_1_REQUIRED (0x0d) +/** + * Provide a user readable description of the HTTP/2 error code- + * @param h2_error http/2 error code, as in rfc 7540, ch. 7 + * @return textual description of code or that it is unknown. + */ +const char *h2_h2_err_description(int h2_error); + /* * One time, post config intialization. */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 4ab7181707..bcb1c3840f 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -60,10 +60,6 @@ static void h2_mplx_destroy(h2_mplx *m) { AP_DEBUG_ASSERT(m); m->aborted = 1; - if (m->q) { - h2_tq_destroy(m->q); - m->q = NULL; - } if (m->ready_ios) { h2_io_set_destroy(m->ready_ios); m->ready_ios = NULL; @@ -128,7 +124,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_workers *workers) m->bucket_alloc = apr_bucket_alloc_create(m->pool); - m->q = h2_tq_create(m->pool, 23); + m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->stream_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool); m->closed = h2_stream_set_create(m->pool); diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 4f7795b945..0f317ffb3a 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -313,7 +313,7 @@ static apr_status_t stream_destroy(h2_session *session, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_stream(%ld-%d): closing with err=%d %s", session->id, (int)stream->id, (int)error_code, - nghttp2_strerror(error_code)); + h2_h2_err_description(error_code)); h2_stream_rst(stream, error_code); } @@ -988,6 +988,7 @@ apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout) if (session->reprioritize) { h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session); + session->reprioritize = 0; } /* Check that any pending window updates are sent. */ diff --git a/modules/http2/h2_task_queue.c b/modules/http2/h2_task_queue.c index f97a99fc01..1653aa26e2 100644 --- a/modules/http2/h2_task_queue.c +++ b/modules/http2/h2_task_queue.c @@ -23,26 +23,24 @@ #include "h2_task_queue.h" -static void grow(h2_task_queue *q, int nlen); -static h2_task *qrm(h2_task_queue *q, int index); -static void tqsort(h2_task_queue *q, int left, int right, - h2_tq_cmp *cmp, void *ctx); +static void tq_grow(h2_task_queue *q, int nlen); +static void tq_swap(h2_task_queue *q, int i, int j); +static int tq_bubble_up(h2_task_queue *q, int i, int top, + h2_tq_cmp *cmp, void *ctx); +static int tq_bubble_down(h2_task_queue *q, int i, int bottom, + h2_tq_cmp *cmp, void *ctx); h2_task_queue *h2_tq_create(apr_pool_t *pool, int capacity) { h2_task_queue *q = apr_pcalloc(pool, sizeof(h2_task_queue)); if (q) { q->pool = pool; - grow(q, capacity); + tq_grow(q, capacity); q->nelts = 0; } return q; } -void h2_tq_destroy(h2_task_queue *q) -{ -} - int h2_tq_empty(h2_task_queue *q) { return q->nelts == 0; @@ -54,108 +52,111 @@ void h2_tq_add(h2_task_queue *q, struct h2_task *task, int i; if (q->nelts >= q->nalloc) { - grow(q, q->nalloc * 2); + tq_grow(q, q->nalloc * 2); } - /* Assume tasks most commonly arrive in ascending order */ - for (i = q->nelts; i > 0; --i) { - if (cmp(q->elts[i-1], task, ctx) <= 0) { - if (i < q->nelts) { - memmove(q->elts+i+1, q->elts+i, q->nelts - i); - } - q->elts[i] = task; - q->nelts++; - return; - } - } - /* insert at front */ - if (q->nelts) { - memmove(q->elts+1, q->elts, q->nelts); - } - q->elts[q->nelts++] = task; + i = (q->head + q->nelts) % q->nalloc; + q->elts[i] = task; + ++q->nelts; + + /* bubble it to the front of the queue */ + tq_bubble_up(q, i, q->head, cmp, ctx); } void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx) { - tqsort(q, 0, q->nelts - 1, cmp, ctx); + /* Assume that changes in ordering are minimal. This needs, + * best case, q->nelts - 1 comparisions to check that nothing + * changed. + */ + if (q->nelts > 0) { + int i, ni, prev, last; + + /* Start at the end of the queue and create a tail of sorted + * entries. Make that tail one element longer in each iteration. + */ + last = i = (q->head + q->nelts - 1) % q->nalloc; + while (i != q->head) { + prev = (q->nalloc + i - 1) % q->nalloc; + + ni = tq_bubble_up(q, i, prev, cmp, ctx); + if (ni == prev) { + /* i bubbled one up, bubble the new i down, which + * keeps all tasks below i sorted. */ + tq_bubble_down(q, i, last, cmp, ctx); + } + i = prev; + }; + } } -apr_status_t h2_tq_remove(h2_task_queue *q, struct h2_task *task) +h2_task *h2_tq_shift(h2_task_queue *q) { - int i; + h2_task *t; - for (i = 0; i < q->nelts; ++i) { - if (task == q->elts[i]) { - qrm(q, i); - return APR_SUCCESS; - } + if (q->nelts <= 0) { + return NULL; } - return APR_NOTFOUND; -} - -h2_task *h2_tq_shift(h2_task_queue *q) -{ - return qrm(q, 0); + + t = q->elts[q->head]; + q->head = (q->head + 1) % q->nalloc; + q->nelts--; + + return t; } -static void grow(h2_task_queue *q, int nlen) +static void tq_grow(h2_task_queue *q, int nlen) { AP_DEBUG_ASSERT(q->nalloc <= nlen); if (nlen > q->nalloc) { h2_task **nq = apr_pcalloc(q->pool, sizeof(h2_task *) * nlen); if (q->nelts > 0) { - memmove(nq, q->elts, sizeof(h2_task *) * q->nelts); + int l = ((q->head + q->nelts) % q->nalloc) - q->head; + + memmove(nq, q->elts + q->head, sizeof(h2_task *) * l); + if (l < q->nelts) { + /* elts wrapped, append elts in [0, remain] to nq */ + int remain = q->nelts - l; + memmove(nq + l, q->elts, sizeof(h2_task *) * remain); + } } q->elts = nq; q->nalloc = nlen; + q->head = 0; } } -static h2_task *qrm(h2_task_queue *q, int index) -{ - if (index >= q->nelts) { - return NULL; - } - else if (index == q->nelts - 1) { - q->nelts--; - return q->elts[index]; - } - else { - h2_task *t = q->elts[index]; - q->nelts--; - memmove(q->elts+index, q->elts+index+1, - sizeof(q->elts[0]) * (q->nelts - index)); - return t; - } -} - -static void tqswap(h2_task_queue *q, int i, int j) +static void tq_swap(h2_task_queue *q, int i, int j) { h2_task *t = q->elts[i]; q->elts[i] = q->elts[j]; q->elts[j] = t; } -static void tqsort(h2_task_queue *q, int left, int right, - h2_tq_cmp *cmp, void *ctx) +static int tq_bubble_up(h2_task_queue *q, int i, int top, + h2_tq_cmp *cmp, void *ctx) { - int i, last; - - if (left >= right) - return; - tqswap(q, left, (left + right)/2); - last = left; - for (i = left+1; i <= right; i++) { - if ((*cmp)(q->elts[i], q->elts[left], ctx) < 0) { - tqswap(q, ++last, i); - } + int prev; + while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top) + && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) { + tq_swap(q, prev, i); + i = prev; } - tqswap(q, left, last); - tqsort(q, left, last-1, cmp, ctx); - tqsort(q, last+1, right, cmp, ctx); + return i; } +static int tq_bubble_down(h2_task_queue *q, int i, int bottom, + h2_tq_cmp *cmp, void *ctx) +{ + int next; + while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom) + && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) { + tq_swap(q, next, i); + i = next; + } + return i; +} diff --git a/modules/http2/h2_task_queue.h b/modules/http2/h2_task_queue.h index e7e496bcc3..36fad2c4d8 100644 --- a/modules/http2/h2_task_queue.h +++ b/modules/http2/h2_task_queue.h @@ -19,18 +19,32 @@ struct h2_task; /** - * A simple ring of rings that keeps a list of h2_tasks and can - * be ringed itself, using the APR RING macros. + * h2_task_queue keeps a list of sorted h2_task* in ascending order. */ typedef struct h2_task_queue h2_task_queue; struct h2_task_queue { - apr_pool_t *pool; struct h2_task **elts; + int head; int nelts; int nalloc; + apr_pool_t *pool; }; +/** + * Comparator for two task to determine their order. + * + * @param t1 task to compare + * @param t2 task to compare + * @param ctx provided user data + * @return value is the same as for strcmp() and has the effect: + * == 0: t1 and t2 are treated equal in ordering + * < 0: t1 should be sorted before t2 + * > 0: t2 should be sorted before t1 + */ +typedef int h2_tq_cmp(struct h2_task *t1, struct h2_task *t2, void *ctx); + + /** * Allocate a new queue from the pool and initialize. * @param id the identifier of the queue @@ -38,54 +52,39 @@ struct h2_task_queue { */ h2_task_queue *h2_tq_create(apr_pool_t *pool, int capacity); -/** - * Release all queue tasks. - * @param q the queue to destroy - */ -void h2_tq_destroy(h2_task_queue *q); - /** * Return != 0 iff there are no tasks in the queue. * @param q the queue to check */ int h2_tq_empty(h2_task_queue *q); -typedef int h2_tq_cmp(struct h2_task *t1, struct h2_task *t2, void *ctx); - /** - * Add the task to the sorted queue. For optimiztation, it is assumed - * that the order of the existing tasks has not changed. + * Add the task to the queue. * * @param q the queue to append the task to * @param task the task to add - * @param cmp the compare function for sorting - * @param ctx user data for the compare function + * @param cmp the comparator for sorting + * @param ctx user data for comparator */ void h2_tq_add(h2_task_queue *q, struct h2_task *task, h2_tq_cmp *cmp, void *ctx); /** - * Sort the tasks queue again. Useful to call if the task order + * Sort the tasks queue again. Call if the task ordering * has changed. * * @param q the queue to sort - * @param cmp the compare function for sorting - * @param ctx user data for the compare function + * @param cmp the comparator for sorting + * @param ctx user data for the comparator */ void h2_tq_sort(h2_task_queue *q, h2_tq_cmp *cmp, void *ctx); -/** - * Remove a task from the queue. Return APR_SUCCESS if the task - * was indeed queued, APR_NOTFOUND otherwise. - * @param q the queue to remove from - * @param task the task to remove - */ -apr_status_t h2_tq_remove(h2_task_queue *q, struct h2_task *task); - /** * Get the first task from the queue or NULL if the queue is empty. * The task will be removed. + * * @param q the queue to get the first task from + * @return the first task of the queue, NULL if empty */ h2_task *h2_tq_shift(h2_task_queue *q); -- 2.40.0