]> granicus.if.org Git - apache/blobdiff - server/core_filters.c
Merge r1542379 from trunk:
[apache] / server / core_filters.c
index 2c24b13a18b2a127a8b392678c6b44844aa32f51..84e11497c1b0da38ae124e62c37a8cecc6ab7865 100644 (file)
@@ -78,11 +78,23 @@ do { \
 #undef APLOG_MODULE_INDEX
 #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
 
-int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
-                         ap_input_mode_t mode, apr_read_type_e block,
-                         apr_off_t readbytes)
+struct core_output_filter_ctx {
+    apr_bucket_brigade *buffered_bb;
+    apr_bucket_brigade *tmp_flush_bb;
+    apr_pool_t *deferred_write_pool;
+    apr_size_t bytes_written;
+};
+
+struct core_filter_ctx {
+    apr_bucket_brigade *b;
+    apr_bucket_brigade *tmpbb;
+};
+
+
+apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
+                                  ap_input_mode_t mode, apr_read_type_e block,
+                                  apr_off_t readbytes)
 {
-    apr_bucket *e;
     apr_status_t rv;
     core_net_rec *net = f->ctx;
     core_ctx_t *ctx = net->in_ctx;
@@ -105,18 +117,13 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
 
     if (!ctx)
     {
-        /*
-         * Note that this code is never executed on Windows because the winnt
-         * MPM does the setup of net->in_ctx.
-         * XXX: This should be fixed.
-         */
-        ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+        net->in_ctx = ctx = apr_palloc(f->c->pool, sizeof(*ctx));
         ctx->b = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
-        ctx->tmpbb = apr_brigade_create(ctx->b->p, ctx->b->bucket_alloc);
+        ctx->tmpbb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
         /* seed the brigade with the client socket. */
-        e = apr_bucket_socket_create(net->client_socket, f->c->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(ctx->b, e);
-        net->in_ctx = ctx;
+        rv = ap_run_insert_network_bucket(f->c, ctx->b, net->client_socket);
+        if (rv != APR_SUCCESS)
+            return rv;
     }
     else if (APR_BRIGADE_EMPTY(ctx->b)) {
         return APR_EOF;
@@ -142,7 +149,7 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
          * empty).  We do this by returning whatever we have read.  This may
          * or may not be bogus, but is consistent (for now) with EOF logic.
          */
-        if (APR_STATUS_IS_EAGAIN(rv)) {
+        if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
             rv = APR_SUCCESS;
         }
         return rv;
@@ -228,7 +235,9 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
         e = APR_BRIGADE_FIRST(ctx->b);
         rv = apr_bucket_read(e, &str, &len, block);
 
-        if (APR_STATUS_IS_EAGAIN(rv)) {
+        if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
+            /* getting EAGAIN for a blocking read is an error; for a
+             * non-blocking read, return an empty brigade. */
             return APR_SUCCESS;
         }
         else if (rv != APR_SUCCESS) {
@@ -368,7 +377,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
     apr_bucket_brigade *bb = NULL;
     apr_bucket *bucket, *next, *flush_upto = NULL;
     apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
-    int eor_buckets_in_brigade;
+    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
     apr_status_t rv;
 
     /* Fail quickly if the connection has already been aborted. */
@@ -382,10 +391,6 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
     if (ctx == NULL) {
         ctx = apr_pcalloc(c->pool, sizeof(*ctx));
         net->out_ctx = (core_output_filter_ctx_t *)ctx;
-        rv = apr_socket_opt_set(net->client_socket, APR_SO_NONBLOCK, 1);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
         /*
          * Need to create tmp brigade with correct lifetime. Passing
          * NULL to apr_brigade_split_ex would result in a brigade
@@ -414,7 +419,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
     }
 
     /* Scan through the brigade and decide whether to attempt a write,
-     * based on the following rules:
+     * and how much to write, based on the following rules:
      *
      *  1) The new_bb is null: Do a nonblocking write of as much as
      *     possible: do a nonblocking write of as much data as possible,
@@ -423,10 +428,13 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
      *     completion and has just determined that this connection
      *     is writable.)
      *
-     *  2) The brigade contains a flush bucket: Do a blocking write
+     *  2) Determine if and up to which bucket we need to do a blocking
+     *     write:
+     *
+     *  a) The brigade contains a flush bucket: Do a blocking write
      *     of everything up that point.
      *
-     *  3) The request is in CONN_STATE_HANDLER state, and the brigade
+     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
      *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
      *     buckets: Do blocking writes until the amount of data in the
      *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
@@ -434,14 +442,25 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
      *     streaming out lots of data faster than the data can be
      *     sent to the client.)
      *
-     *  4) The request is in CONN_STATE_HANDLER state, and the brigade
+     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
      *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
      *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
      *     buckets are left. (The point of this rule is to prevent too many
      *     FDs being kept open by pipelined requests, possibly allowing a
      *     DoS).
      *
-     *  5) The brigade contains at least THRESHOLD_MIN_WRITE
+     *  d) The brigade contains a morphing bucket: If there was no other
+     *     reason to do a blocking write yet, try reading the bucket. If its
+     *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+     *     everything is fine. Otherwise we need to do a blocking write the
+     *     up to and including the morphing bucket, because ap_save_brigade()
+     *     would read the whole bucket into memory later on.
+     *
+     *  3) Actually do the blocking write up to the last bucket determined
+     *     by rules 2a-d. The point of doing only one flush is to make as
+     *     few calls to writev() as possible.
+     *
+     *  4) If the brigade contains at least THRESHOLD_MIN_WRITE
      *     bytes: Do a nonblocking write of as much data as possible,
      *     then save the rest in ctx->buffered_bb.
      */
@@ -454,6 +473,8 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
         }
         else if (rv != APR_SUCCESS) {
             /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
             c->aborted = 1;
         }
         setaside_remaining_output(f, ctx, bb, c);
@@ -463,40 +484,43 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
     bytes_in_brigade = 0;
     non_file_bytes_in_brigade = 0;
     eor_buckets_in_brigade = 0;
+    morphing_bucket_in_brigade = 0;
+
     for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
          bucket = next) {
         next = APR_BUCKET_NEXT(bucket);
 
         if (!APR_BUCKET_IS_METADATA(bucket)) {
             if (bucket->length == (apr_size_t)-1) {
-                const char *data;
-                apr_size_t length;
-                /* XXX support nonblocking read here? */
-                rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
-                if (rv != APR_SUCCESS) {
-                    return rv;
-                }
-                /* reading may have split the bucket, so recompute next: */
-                next = APR_BUCKET_NEXT(bucket);
+                /*
+                 * A setaside of morphing buckets would read everything into
+                 * memory. Instead, we will flush everything up to and
+                 * including this bucket.
+                 */
+                morphing_bucket_in_brigade = 1;
             }
-            bytes_in_brigade += bucket->length;
-            if (!APR_BUCKET_IS_FILE(bucket)) {
-                non_file_bytes_in_brigade += bucket->length;
+            else {
+                bytes_in_brigade += bucket->length;
+                if (!APR_BUCKET_IS_FILE(bucket))
+                    non_file_bytes_in_brigade += bucket->length;
             }
         }
         else if (AP_BUCKET_IS_EOR(bucket)) {
             eor_buckets_in_brigade++;
         }
 
-        if (APR_BUCKET_IS_FLUSH(bucket)                         ||
-            (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ||
-            (eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) )
-        {
+        if (APR_BUCKET_IS_FLUSH(bucket)
+            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+            || morphing_bucket_in_brigade
+            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+            /* this segment of the brigade MUST be sent before returning. */
+
             if (APLOGctrace6(c)) {
                 char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
                                "FLUSH bucket" :
                                (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
                                "THRESHOLD_MAX_BUFFER" :
+                               morphing_bucket_in_brigade ? "morphing bucket" :
                                "MAX_REQUESTS_IN_PIPELINE";
                 ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
                               "core_output_filter: flushing because of %s",
@@ -510,6 +534,7 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
             bytes_in_brigade = 0;
             non_file_bytes_in_brigade = 0;
             eor_buckets_in_brigade = 0;
+            morphing_bucket_in_brigade = 0;
         }
     }
 
@@ -520,6 +545,8 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
                                    &(ctx->bytes_written), c);
         if (rv != APR_SUCCESS) {
             /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
             c->aborted = 1;
             return rv;
         }
@@ -531,6 +558,8 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
                                       &(ctx->bytes_written), c);
         if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
             /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
             c->aborted = 1;
             return rv;
         }
@@ -636,10 +665,26 @@ static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
         if (!APR_BUCKET_IS_METADATA(bucket)) {
             const char *data;
             apr_size_t length;
-            rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+            
+            /* Non-blocking read first, in case this is a morphing
+             * bucket type. */
+            rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
+            if (APR_STATUS_IS_EAGAIN(rv)) {
+                /* Read would block; flush any pending data and retry. */
+                if (nvec) {
+                    rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
+                    if (rv) {
+                        return rv;
+                    }
+                    nvec = 0;
+                }
+                
+                rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+            }
             if (rv != APR_SUCCESS) {
                 return rv;
             }
+
             /* reading may have split the bucket, so recompute next: */
             next = APR_BUCKET_NEXT(bucket);
             vec[nvec].iov_base = (char *)data;
@@ -700,7 +745,9 @@ static apr_status_t send_brigade_blocking(apr_socket_t *s,
                 pollset.reqevents = APR_POLLOUT;
                 pollset.desc.s = s;
                 apr_socket_timeout_get(s, &timeout);
-                rv = apr_poll(&pollset, 1, &nsds, timeout);
+                do {
+                    rv = apr_poll(&pollset, 1, &nsds, timeout);
+                } while (APR_STATUS_IS_EINTR(rv));
                 if (rv != APR_SUCCESS) {
                     break;
                 }