]> granicus.if.org Git - apache/commitdiff
mod_http2: some code cleanup of stream request body handling, potential avoid a buffe...
authorStefan Eissing <icing@apache.org>
Thu, 10 Mar 2016 15:51:14 +0000 (15:51 +0000)
committerStefan Eissing <icing@apache.org>
Thu, 10 Mar 2016 15:51:14 +0000 (15:51 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1734428 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_stream.h

index 3f82c60f1029db7578b2cf5c9aead2c5dcf6956a..92f4275173139e06829056eb2b8987e738802c91 100644 (file)
@@ -45,6 +45,14 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request)
     return io;
 }
 
+static void check_bbin(h2_io *io)
+{
+    if (!io->bbin) {
+        io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
+        io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
+    }
+}
+
 void h2_io_redo(h2_io *io)
 {
     io->worker_started = 0;
@@ -85,23 +93,12 @@ void h2_io_set_response(h2_io *io, h2_response *response)
     }
 }
 
-
 void h2_io_rst(h2_io *io, int error)
 {
     io->rst_error = error;
     io->eos_in = 1;
 }
 
-int h2_io_in_has_eos_for(h2_io *io)
-{
-    return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
-}
-
-int h2_io_in_has_data(h2_io *io)
-{
-    return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
-}
-
 int h2_io_out_has_data(h2_io *io)
 {
     return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -298,7 +295,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
     return status;
 }
 
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos)
 {
     if (io->rst_error) {
         return APR_ECONNABORTED;
@@ -307,13 +304,12 @@ apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb)
     if (io->eos_in) {
         return APR_EOF;
     }
-    io->eos_in = h2_util_has_eos(bb, -1);
-    if (!APR_BRIGADE_EMPTY(bb)) {
-        if (!io->bbin) {
-            io->bbin = apr_brigade_create(io->pool, io->bucket_alloc);
-            io->tmp = apr_brigade_create(io->pool, io->bucket_alloc);
-        }
-        return h2_util_move(io->bbin, bb, -1, NULL, "h2_io_in_write");
+    if (eos) {
+        io->eos_in = 1;
+    }
+    if (len > 0) {
+        check_bbin(io);
+        return apr_brigade_write(io->bbin, NULL, NULL, d, len);
     }
     return APR_SUCCESS;
 }
index d92b7eb0d423f68b14778d8b4c21e05b784ec18d..b9742f99fe16d2221bc8f9da0eecc3cea5d780e4 100644 (file)
@@ -92,19 +92,10 @@ void h2_io_rst(h2_io *io, int error);
 int h2_io_is_repeatable(h2_io *io);
 void h2_io_redo(h2_io *io);
 
-/**
- * The input data is completely queued. Blocked reads will return immediately
- * and give either data or EOF.
- */
-int h2_io_in_has_eos_for(h2_io *io);
 /**
  * Output data is available.
  */
 int h2_io_out_has_data(h2_io *io);
-/**
- * Input data is available.
- */
-int h2_io_in_has_data(h2_io *io);
 
 void h2_io_signal(h2_io *io, h2_io_op op);
 void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, 
@@ -127,7 +118,7 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
 /**
  * Appends given bucket to the input.
  */
-apr_status_t h2_io_in_write(h2_io *io, apr_bucket_brigade *bb);
+apr_status_t h2_io_in_write(h2_io *io, const char *d, apr_size_t len, int eos);
 
 /**
  * Closes the input. After existing data has been read, APR_EOF will
index 4d7f63bb52e981f718f1b801478e66b85234af3d..b60d328959bf7d89bc3b5febba215fa6c3cc54d9 100644 (file)
@@ -498,7 +498,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
 }
 
 apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
-                              apr_bucket_brigade *bb)
+                              const char *data, apr_size_t len, int eos)
 {
     apr_status_t status;
     int acquired;
@@ -508,7 +508,7 @@ apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id,
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io && !io->orphaned) {
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_pre");
-            status = h2_io_in_write(io, bb);
+            status = h2_io_in_write(io, data, len, eos);
             H2_MPLX_IO_IN(APLOG_TRACE2, m, io, "h2_mplx_in_write_post");
             h2_io_signal(io, H2_IO_READ);
             io_process_events(m, io);
@@ -898,46 +898,6 @@ apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
     return status;
 }
 
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
-{
-    int has_eos = 0;
-    int acquired;
-    
-    apr_status_t status;
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            has_eos = h2_io_in_has_eos_for(io);
-        }
-        else {
-            has_eos = 1;
-        }
-        leave_mutex(m, acquired);
-    }
-    return has_eos;
-}
-
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
-{
-    apr_status_t status;
-    int has_data = 0;
-    int acquired;
-    
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
-        if (io && !io->orphaned) {
-            has_data = h2_io_in_has_data(io);
-        }
-        else {
-            has_data = 0;
-        }
-        leave_mutex(m, acquired);
-    }
-    return has_data;
-}
-
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
index a61a63891ae63b6dbb3b39c8083f163b5de8aeac..e33d5e5a2ad237283321d05a8fcc43ef59293265 100644 (file)
@@ -171,10 +171,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
  */
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
 
-/* Return != 0 iff the multiplexer has input data for the given stream. 
- */
-int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
-
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -238,20 +234,14 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
  * Appends data to the input of the given stream. Storage of input data is
  * not subject to flow control.
  */
-apr_status_t h2_mplx_in_write(h2_mplx *mplx, int stream_id, 
-                              apr_bucket_brigade *bb);
+apr_status_t h2_mplx_in_write(h2_mplx *m, int stream_id, 
+                              const char *data, apr_size_t len, int eos);
 
 /**
  * Closes the input for the given stream_id.
  */
 apr_status_t h2_mplx_in_close(h2_mplx *m, int stream_id);
 
-/**
- * Returns != 0 iff the input for the given stream has been closed. There
- * could still be data queued, but it can be read without blocking.
- */
-int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id);
-
 /**
  * Invoke the consumed callback for all streams that had bytes read since the 
  * last call to this function. If no stream had input data consumed, the 
index d99573850d5d50743b13ab9061b50ab4ddc13f89..78b91efccfdb7e0ae1d3d5d9683b452b8a059971 100644 (file)
@@ -235,8 +235,11 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags,
         }
         return 0;
     }
-    
-    status = h2_stream_write_data(stream, (const char *)data, len);
+
+    /* FIXME: enabling setting EOS this way seems to break input handling
+     * in mod_proxy_http2. why? */
+    status = h2_stream_write_data(stream, (const char *)data, len,
+                                  0 /*flags & NGHTTP2_FLAG_END_STREAM*/);
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c,
                   "h2_stream(%ld-%d): data_chunk_recv, written %ld bytes",
                   session->id, stream_id, (long)len);
index 29df7afd82352d84990bbdaf89dda5d97cd12847..2b368b67cf04f2b6fd49bc827812b6f3ec8bb101 100644 (file)
 #include "h2_util.h"
 
 
-#define H2_STREAM_IN(lvl,s,msg) \
-    do { \
-        if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
-        h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
-    } while(0)
-    
-
 static int state_transition[][7] = {
     /*  ID OP RL RR CI CO CL */
 /*ID*/{  1, 0, 0, 0, 0, 0, 0 },
@@ -144,19 +137,13 @@ static int output_open(h2_stream *stream)
 
 static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
 
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
+h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
 {
     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
     stream->id        = id;
     stream->state     = H2_STREAM_ST_IDLE;
     stream->pool      = pool;
     stream->session   = session;
-    return stream;
-}
-
-h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
-{
-    h2_stream *stream = h2_stream_create(id, pool, session);
     set_state(stream, H2_STREAM_ST_OPEN);
     stream->request   = h2_request_create(id, pool, 
         h2_config_geti(session->config, H2_CONF_SER_HEADERS));
@@ -296,8 +283,6 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled,
     if (status == APR_SUCCESS) {
         if (!eos) {
             stream->request->body = 1;
-            stream->bbin = apr_brigade_create(stream->pool, 
-                                              stream->session->c->bucket_alloc);
         }
         stream->input_remaining = stream->request->content_length;
         
@@ -328,33 +313,6 @@ int h2_stream_is_scheduled(const h2_stream *stream)
     return stream->scheduled;
 }
 
-static apr_status_t h2_stream_input_flush(h2_stream *stream)
-{
-    apr_status_t status = APR_SUCCESS;
-    if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
-
-        status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
-        if (status != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
-                          "h2_stream(%ld-%d): flushing input data",
-                          stream->session->id, stream->id);
-        }
-    }
-    return status;
-}
-
-static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) 
-{
-    (void)bb;
-    return h2_stream_input_flush(ctx);
-}
-
-static apr_status_t input_add_data(h2_stream *stream,
-                                   const char *data, size_t len)
-{
-    return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
-}
-
 apr_status_t h2_stream_close_input(h2_stream *stream)
 {
     apr_status_t status = APR_SUCCESS;
@@ -368,28 +326,23 @@ apr_status_t h2_stream_close_input(h2_stream *stream)
         return APR_ECONNRESET;
     }
     
-    H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
-    if (close_input(stream) && stream->bbin) {
-        status = h2_stream_input_flush(stream);
-        if (status == APR_SUCCESS) {
-            status = h2_mplx_in_close(stream->session->mplx, stream->id);
-        }
+    if (close_input(stream)) {
+        status = h2_mplx_in_close(stream->session->mplx, stream->id);
     }
-    H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
     return status;
 }
 
 apr_status_t h2_stream_write_data(h2_stream *stream,
-                                  const char *data, size_t len)
+                                  const char *data, size_t len, int eos)
 {
     apr_status_t status = APR_SUCCESS;
     
     AP_DEBUG_ASSERT(stream);
-    if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
+    if (input_closed(stream) || !stream->request->eoh) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
-                      "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", 
+                      "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d", 
                       stream->session->id, stream->id, input_closed(stream),
-                      stream->request->eoh, !!stream->bbin);
+                      stream->request->eoh);
         return APR_EINVAL;
     }
 
@@ -397,7 +350,6 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
                   "h2_stream(%ld-%d): add %ld input bytes", 
                   stream->session->id, stream->id, (long)len);
 
-    H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
     if (!stream->request->chunked) {
         stream->input_remaining -= len;
         if (stream->input_remaining < 0) {
@@ -413,11 +365,10 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
         }
     }
     
-    status = input_add_data(stream, data, len);
-    if (status == APR_SUCCESS) {
-        status = h2_stream_input_flush(stream);
+    status = h2_mplx_in_write(stream->session->mplx, stream->id, data, len, eos);
+    if (eos) {
+        close_input(stream);
     }
-    H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
     return status;
 }
 
index 7d724259fa51c1989c7e128ce94ae2809a63342e..b7df632502c0fa79748f78ee99b89b35c18c65d6 100644 (file)
@@ -57,7 +57,6 @@ struct h2_stream {
     unsigned int submitted : 1; /* response HEADER has been sent */
     
     apr_off_t input_remaining;  /* remaining bytes on input as advertised via content-length */
-    apr_bucket_brigade *bbin;   /* input DATA */
 
     struct h2_sos *sos;         /* stream output source, e.g. to read output from */
     apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */
@@ -66,15 +65,6 @@ struct h2_stream {
 
 #define H2_STREAM_RST(s, def)    (s->rst_error? s->rst_error : (def))
 
-/**
- * Create a stream in IDLE state.
- * @param id      the stream identifier
- * @param pool    the memory pool to use for this stream
- * @param session the session this stream belongs to
- * @return the newly created IDLE stream
- */
-h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_session *session);
-
 /**
  * Create a stream in OPEN state.
  * @param id      the stream identifier
@@ -155,7 +145,7 @@ apr_status_t h2_stream_close_input(h2_stream *stream);
  * @param len the number of bytes to write
  */
 apr_status_t h2_stream_write_data(h2_stream *stream,
-                                  const char *data, size_t len);
+                                  const char *data, size_t len, int eos);
 
 /**
  * Reset the stream. Stream write/reads will return errors afterwards.