]> granicus.if.org Git - apache/commitdiff
fixing input WINDOW_UPDATE handling for new test cases
authorStefan Eissing <icing@apache.org>
Wed, 2 Dec 2015 14:38:32 +0000 (14:38 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 2 Dec 2015 14:38:32 +0000 (14:38 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1717639 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_session.c

index 6bd2b83739e9619a3e0b0b2b0383d429a2fa72dc..98ba60e6abb405a3a1fe43a71e3e83599da50182 100644 (file)
@@ -85,6 +85,17 @@ apr_off_t h2_io_out_length(h2_io *io)
     return 0;
 }
 
+apr_status_t h2_io_in_shutdown(h2_io *io)
+{
+    if (io->bbin) {
+        apr_off_t end_len = 0;
+        apr_brigade_length(io->bbin, 1, &end_len);
+        io->input_consumed += end_len;
+        apr_brigade_cleanup(io->bbin);
+    }
+    return h2_io_in_close(io);
+}
+
 apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb, 
                            apr_size_t maxlen)
 {
index dcf493539bb38be5e49d6836bdb8d8344be0da2c..08f3aa3dad73d8a6f3cb4ff638169091fd58b397 100644 (file)
@@ -107,6 +107,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
  */
 apr_status_t h2_io_in_close(h2_io *io);
 
+/**
+ * Shuts all input down. Will close input and mark any data buffered
+ * as consumed.
+ */
+apr_status_t h2_io_in_shutdown(h2_io *io);
+
 /*******************************************************************************
  * Output handling of streams.
  ******************************************************************************/
index c4efed6276d6ac66a31013881cc2a06a8417c757..333a9b5c6bd7b0fba1a91cd0cbeaed7a6f0cc564 100644 (file)
@@ -194,10 +194,28 @@ static void workers_unregister(h2_mplx *m) {
     h2_workers_unregister(m->workers, m);
 }
 
-static void io_destroy(h2_mplx *m, h2_io *io)
+static int io_process_events(h2_mplx *m, h2_io *io) {
+    if (io->input_consumed && m->input_consumed) {
+        m->input_consumed(m->input_consumed_ctx, 
+                          io->id, io->input_consumed);
+        io->input_consumed = 0;
+        return 1;
+    }
+    return 0;
+}
+
+
+static void io_destroy(h2_mplx *m, h2_io *io, int events)
 {
     apr_pool_t *pool = io->pool;
     
+    /* cleanup any buffered input */
+    h2_io_in_shutdown(io);
+    if (events) {
+        /* Process outstanding events before destruction */
+        io_process_events(m, io);
+    }
+    
     io->pool = NULL;    
     /* The pool is cleared/destroyed which also closes all
      * allocated file handles. Give this count back to our
@@ -222,7 +240,7 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
     h2_io_set_remove(m->ready_ios, io);
     if (io->task_done || h2_tq_remove(m->q, io->id)) {
         /* already finished or not even started yet */
-        io_destroy(m, io);
+        io_destroy(m, io, 1);
         return 0;
     }
     else {
@@ -310,7 +328,7 @@ void h2_mplx_task_done(h2_mplx *m, int stream_id)
         if (io) {
             io->task_done = 1;
             if (io->orphaned) {
-                io_destroy(m, io);
+                io_destroy(m, io, 0);
             }
             else {
                 /* hang around until the stream deregisteres */
@@ -371,6 +389,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
             if (io->input_arrived) {
                 apr_thread_cond_signal(io->input_arrived);
             }
+            io_process_events(m, io);
         }
         else {
             status = APR_EOF;
@@ -396,6 +415,7 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
             if (io->input_arrived) {
                 apr_thread_cond_signal(io->input_arrived);
             }
+            io_process_events(m, io);
         }
         else {
             status = APR_ECONNABORTED;
@@ -406,24 +426,26 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id)
 }
 
 typedef struct {
-    h2_mplx_consumed_cb *cb;
-    void *cb_ctx;
+    h2_mplx * m;
     int streams_updated;
 } update_ctx;
 
 static int update_window(void *ctx, h2_io *io)
 {
-    if (io->input_consumed) {
-        update_ctx *uctx = (update_ctx*)ctx;
-        uctx->cb(uctx->cb_ctx, io->id, io->input_consumed);
-        io->input_consumed = 0;
+    update_ctx *uctx = (update_ctx*)ctx;
+    if (io_process_events(uctx->m, io)) {
         ++uctx->streams_updated;
     }
     return 1;
 }
 
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m, 
-                                       h2_mplx_consumed_cb *cb, void *cb_ctx)
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx)
+{
+    m->input_consumed = cb;
+    m->input_consumed_ctx = ctx;
+}
+
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(m);
@@ -434,8 +456,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
     if (APR_SUCCESS == status) {
         update_ctx ctx;
         
-        ctx.cb              = cb;
-        ctx.cb_ctx          = cb_ctx;
+        ctx.m               = m;
         ctx.streams_updated = 0;
 
         status = APR_EAGAIN;
@@ -549,11 +570,15 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
                               m->id, io->id);
                 io->orphaned = 1;
                 if (io->task_done) {
-                    io_destroy(m, io);
+                    io_destroy(m, io, 1);
                 }
                 else {
-                    /* hang around until the h2_task is done */
+                    /* hang around until the h2_task is done, but
+                     * shutdown input and send out any events (e.g. window
+                     * updates) asap. */
+                    h2_io_in_shutdown(io);
                     h2_io_rst(io, H2_ERR_STREAM_CLOSED);
+                    io_process_events(m, io);
                 }
             }
             
index f145428ff3cc2b0117739f587e6fa58cb2768160..f2805be373a1fb5866e3f88e97a146ac246a703e 100644 (file)
@@ -53,6 +53,12 @@ struct h2_task_queue;
 
 typedef struct h2_mplx h2_mplx;
 
+/**
+ * Callback invoked for every stream that had input data read since
+ * the last invocation.
+ */
+typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);
+
 struct h2_mplx {
     long id;
     APR_RING_ENTRY(h2_mplx) link;
@@ -75,6 +81,9 @@ struct h2_mplx {
     apr_pool_t *spare_pool;           /* spare pool, ready for next io */
     struct h2_workers *workers;
     int file_handles_allowed;
+    
+    h2_mplx_consumed_cb *input_consumed;
+    void *input_consumed_ctx;
 };
 
 
@@ -173,6 +182,17 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
 
 struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, struct h2_worker *w, int *has_more);
 
+/**
+ * Register a callback for the amount of input data consumed per stream. The
+ * will only ever be invoked from the thread creating this h2_mplx, e.g. when
+ * calls from that thread into this h2_mplx are made.
+ *
+ * @param m the multiplexer to register the callback at
+ * @param cb the function to invoke
+ * @param ctx user supplied argument to invocation.
+ */
+void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx);
+
 /*******************************************************************************
  * Input handling of streams.
  ******************************************************************************/
@@ -207,20 +227,15 @@ apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
 int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
 
 /**
- * Callback invoked for every stream that had input data read since
- * the last invocation.
- */
-typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed);
-
-/**
- * Invoke the callback for all streams that had bytes read since the last
- * call to this function. If no stream had input data consumed, the callback
- * is not invoked.
+ * Invoke the consumed callback for all streams that had bytes read since the 
+ * last call to this function. If no stream had input data consumed, the 
+ * callback is not invoked.
+ * The consumed callback may also be invoked at other times whenever
+ * the need arises.
  * Returns APR_SUCCESS when an update happened, APR_EAGAIN if no update
  * happened.
  */
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m, 
-                                       h2_mplx_consumed_cb *cb, void *ctx);
+apr_status_t h2_mplx_in_update_windows(h2_mplx *m);
 
 /*******************************************************************************
  * Output handling of streams.
index ca1d87824de8ac3074d1fa17a69ab51c89670e22..cce8ca7d5ae95297d25e923950c9e06b555730b2 100644 (file)
@@ -57,6 +57,16 @@ static int h2_session_status_from_apr_status(apr_status_t rv)
     return NGHTTP2_ERR_PROTO;
 }
 
+static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
+{
+    h2_session *session = (h2_session*)ctx;
+    nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                  "h2_session(%ld-%d): consumed %ld bytes",
+                  session->id, stream_id, (long)bytes_read);
+}
+
+
 h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
 {
     h2_stream * stream;
@@ -221,6 +231,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
                   "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
                   session->id, stream_id, (long)len);
     if (status != APR_SUCCESS) {
+        update_window(session, stream_id, len);
         rv = nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id,
                                        H2_STREAM_RST(stream, H2_ERR_INTERNAL_ERROR));
         if (nghttp2_is_fatal(rv)) {
@@ -555,11 +566,11 @@ static int on_frame_send_cb(nghttp2_session *ngh2,
                             void *user_data)
 {
     h2_session *session = user_data;
-    if (APLOGctrace1(session->c)) {
+    if (APLOGcdebug(session->c)) {
         char buffer[256];
         
         frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
                       "h2_session(%ld): frame_send %s",
                       session->id, buffer);
     }
@@ -681,6 +692,8 @@ static h2_session *h2_session_create_int(conn_rec *c,
         session->workers = workers;
         session->mplx = h2_mplx_create(c, session->pool, config, workers);
         
+        h2_mplx_set_consumed_cb(session->mplx, update_window, session);
+        
         h2_conn_io_init(&session->io, c, config, session->pool);
         session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
         
@@ -998,12 +1011,6 @@ static int h2_session_resume_streams_with_data(h2_session *session) {
     return 0;
 }
 
-static void update_window(void *ctx, int stream_id, apr_off_t bytes_read)
-{
-    h2_session *session = (h2_session*)ctx;
-    nghttp2_session_consume(session->ngh2, stream_id, bytes_read);
-}
-
 h2_stream *h2_session_get_stream(h2_session *session, int stream_id)
 {
     if (!session->last_stream || stream_id != session->last_stream->id) {
@@ -1579,9 +1586,13 @@ apr_status_t h2_session_process(h2_session *session)
         }
         
         if (wait_micros > 0) {
-            ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c,
-                          "h2_session: wait for data, %ld micros", (long)(wait_micros));
-            h2_conn_io_pass(&session->io);
+            if (APLOGcdebug(session->c)) {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c,
+                              "h2_session: wait for data, %ld micros", 
+                              (long)wait_micros);
+            }
+            nghttp2_session_send(session->ngh2);
+            h2_conn_io_flush(&session->io);
             status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait);
             
             if (status == APR_TIMEUP) {
@@ -1687,16 +1698,14 @@ apr_status_t h2_session_process(h2_session *session)
                 }
             }
             
-            if (h2_stream_set_has_open_input(session->streams)) {
-                /* Check that any pending window updates are sent. */
-                status = h2_mplx_in_update_windows(session->mplx, update_window, session);
-                if (APR_STATUS_IS_EAGAIN(status)) {
-                    status = APR_SUCCESS;
-                }
-                else if (status == APR_SUCCESS) {
-                    /* need to flush window updates onto the connection asap */
-                    h2_conn_io_flush(&session->io);
-                }
+            /* Check that any pending window updates are sent. */
+            status = h2_mplx_in_update_windows(session->mplx);
+            if (APR_STATUS_IS_EAGAIN(status)) {
+                status = APR_SUCCESS;
+            }
+            else if (status == APR_SUCCESS) {
+                /* need to flush window updates onto the connection asap */
+                h2_conn_io_flush(&session->io);
             }
         }