]> granicus.if.org Git - apache/blobdiff - server/core_filters.c
Merge r1615026 from trunk:
[apache] / server / core_filters.c
index 90f5d4bf7ba0cdc6c69f5e4af0c43486aa8e3fc9..8700b76d532f417d91fffdbfe803f4d534023ed5 100644 (file)
@@ -1,9 +1,9 @@
-/* Copyright 2001-2005 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
 #include "apr_fnmatch.h"
 #include "apr_hash.h"
 #include "apr_thread_proc.h"    /* for RLIMIT stuff */
-#include "apr_hooks.h"
 
 #define APR_WANT_IOVEC
 #define APR_WANT_STRFUNC
 #define APR_WANT_MEMFUNC
 #include "apr_want.h"
 
-#define CORE_PRIVATE
 #include "ap_config.h"
 #include "httpd.h"
 #include "http_config.h"
@@ -47,7 +45,6 @@
 #include "apr_buckets.h"
 #include "util_filter.h"
 #include "util_ebcdic.h"
-#include "mpm.h"
 #include "mpm_common.h"
 #include "scoreboard.h"
 #include "mod_core.h"
@@ -77,32 +74,27 @@ do { \
     } while (!APR_BRIGADE_EMPTY(b) && (e != APR_BRIGADE_SENTINEL(b))); \
 } while (0)
 
+/* we know core's module_index is 0 */
+#undef APLOG_MODULE_INDEX
+#define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
 
-/**
- * Split the contents of a brigade after bucket 'e' to an existing brigade
- *
- * XXXX: Should this function be added to APR-Util?
- */
-static void brigade_move(apr_bucket_brigade *b, apr_bucket_brigade *a,
-                         apr_bucket *e)
-{
-    apr_bucket *f;
+struct core_output_filter_ctx {
+    apr_bucket_brigade *buffered_bb;
+    apr_bucket_brigade *tmp_flush_bb;
+    apr_pool_t *deferred_write_pool;
+    apr_size_t bytes_written;
+};
 
-    if (e != APR_BRIGADE_SENTINEL(b)) {
-        f = APR_RING_LAST(&b->list);
-        APR_RING_UNSPLICE(e, f, link);
-        APR_RING_SPLICE_HEAD(&a->list, e, f, apr_bucket, link);
-    }
+struct core_filter_ctx {
+    apr_bucket_brigade *b;
+    apr_bucket_brigade *tmpbb;
+};
 
-    APR_BRIGADE_CHECK_CONSISTENCY(a);
-    APR_BRIGADE_CHECK_CONSISTENCY(b);
-}
 
-int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
-                         ap_input_mode_t mode, apr_read_type_e block,
-                         apr_off_t readbytes)
+apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
+                                  ap_input_mode_t mode, apr_read_type_e block,
+                                  apr_off_t readbytes)
 {
-    apr_bucket *e;
     apr_status_t rv;
     core_net_rec *net = f->ctx;
     core_ctx_t *ctx = net->in_ctx;
@@ -125,13 +117,13 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
 
     if (!ctx)
     {
-        ctx = apr_pcalloc(f->c->pool, sizeof(*ctx));
+        net->in_ctx = ctx = apr_palloc(f->c->pool, sizeof(*ctx));
         ctx->b = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
-        ctx->tmpbb = apr_brigade_create(ctx->b->p, ctx->b->bucket_alloc);
+        ctx->tmpbb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
         /* seed the brigade with the client socket. */
-        e = apr_bucket_socket_create(net->client_socket, f->c->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(ctx->b, e);
-        net->in_ctx = ctx;
+        rv = ap_run_insert_network_bucket(f->c, ctx->b, net->client_socket);
+        if (rv != APR_SUCCESS)
+            return rv;
     }
     else if (APR_BRIGADE_EMPTY(ctx->b)) {
         return APR_EOF;
@@ -157,7 +149,7 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
          * empty).  We do this by returning whatever we have read.  This may
          * or may not be bogus, but is consistent (for now) with EOF logic.
          */
-        if (APR_STATUS_IS_EAGAIN(rv)) {
+        if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
             rv = APR_SUCCESS;
         }
         return rv;
@@ -214,10 +206,9 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
      * the brigade that was passed down, and send that brigade back.
      *
      * NOTE:  This is VERY dangerous to use, and should only be done with
-     * extreme caution.  However, the Perchild MPM needs this feature
-     * if it is ever going to work correctly again.  With this, the Perchild
-     * MPM can easily request the socket and all data that has been read,
-     * which means that it can pass it to the correct child process.
+     * extreme caution.  FWLIW, this would be needed by an MPM like Perchild;
+     * such an MPM can easily request the socket and all data that has been
+     * read, which means that it can pass it to the correct child process.
      */
     if (mode == AP_MODE_EXHAUSTIVE) {
         apr_bucket *e;
@@ -244,7 +235,9 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
         e = APR_BRIGADE_FIRST(ctx->b);
         rv = apr_bucket_read(e, &str, &len, block);
 
-        if (APR_STATUS_IS_EAGAIN(rv)) {
+        if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
+            /* getting EAGAIN for a blocking read is an error; for a
+             * non-blocking read, return an empty brigade. */
             return APR_SUCCESS;
         }
         else if (rv != APR_SUCCESS) {
@@ -268,6 +261,37 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
             return APR_SUCCESS;
         }
 
+        /* Have we read as much data as we wanted (be greedy)? */
+        if (len < readbytes) {
+            apr_size_t bucket_len;
+
+            rv = APR_SUCCESS;
+            /* We already registered the data in e in len */
+            e = APR_BUCKET_NEXT(e);
+            while ((len < readbytes) && (rv == APR_SUCCESS)
+                   && (e != APR_BRIGADE_SENTINEL(ctx->b))) {
+                /* Check for the availability of buckets with known length */
+                if (e->length != -1) {
+                    len += e->length;
+                    e = APR_BUCKET_NEXT(e);
+                }
+                else {
+                    /*
+                     * Read from bucket, but non blocking. If there isn't any
+                     * more data, well than this is fine as well, we will
+                     * not wait for more since we already got some and we are
+                     * only checking if there isn't more.
+                     */
+                    rv = apr_bucket_read(e, &str, &bucket_len,
+                                         APR_NONBLOCK_READ);
+                    if (rv == APR_SUCCESS) {
+                        len += bucket_len;
+                        e = APR_BUCKET_NEXT(e);
+                    }
+                }
+            }
+        }
+
         /* We can only return at most what we read. */
         if (len < readbytes) {
             readbytes = len;
@@ -279,7 +303,7 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
         }
 
         /* Must do move before CONCAT */
-        brigade_move(ctx->b, ctx->tmpbb, e);
+        ctx->tmpbb = apr_brigade_split_ex(ctx->b, e, ctx->tmpbb);
 
         if (mode == AP_MODE_READBYTES) {
             APR_BRIGADE_CONCAT(b, ctx->b);
@@ -308,16 +332,14 @@ int ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
 static void setaside_remaining_output(ap_filter_t *f,
                                       core_output_filter_ctx_t *ctx,
                                       apr_bucket_brigade *bb,
-                                      int make_a_copy, conn_rec *c);
+                                      conn_rec *c);
 
 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
                                              apr_bucket_brigade *bb,
                                              apr_size_t *bytes_written,
                                              conn_rec *c);
 
-static void detect_error_bucket(apr_bucket *bucket, conn_rec *c);
-
-static void remove_empty_buckets(apr_bucket_brigade *bb, conn_rec *c);
+static void remove_empty_buckets(apr_bucket_brigade *bb);
 
 static apr_status_t send_brigade_blocking(apr_socket_t *s,
                                           apr_bucket_brigade *bb,
@@ -330,64 +352,74 @@ static apr_status_t writev_nonblocking(apr_socket_t *s,
                                        apr_size_t *cumulative_bytes_written,
                                        conn_rec *c);
 
+#if APR_HAS_SENDFILE
 static apr_status_t sendfile_nonblocking(apr_socket_t *s,
-                                         apr_bucket_brigade *bb,
+                                         apr_bucket *bucket,
                                          apr_size_t *cumulative_bytes_written,
                                          conn_rec *c);
+#endif
 
+/* XXX: Should these be configurable parameters? */
 #define THRESHOLD_MIN_WRITE 4096
 #define THRESHOLD_MAX_BUFFER 65536
+#define MAX_REQUESTS_IN_PIPELINE 5
 
 /* Optional function coming from mod_logio, used for logging of output
  * traffic
  */
-extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *logio_add_bytes_out;
+extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
 
 apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
 {
     conn_rec *c = f->c;
     core_net_rec *net = f->ctx;
     core_output_filter_ctx_t *ctx = net->out_ctx;
-    apr_bucket_brigade *bb;
-    apr_bucket *bucket, *next;
+    apr_bucket_brigade *bb = NULL;
+    apr_bucket *bucket, *next, *flush_upto = NULL;
     apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
+    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+    apr_status_t rv;
+
+    /* Fail quickly if the connection has already been aborted. */
+    if (c->aborted) {
+        if (new_bb != NULL) {
+            apr_brigade_cleanup(new_bb);
+        }
+        return APR_ECONNABORTED;
+    }
 
     if (ctx == NULL) {
-        apr_status_t rv;
         ctx = apr_pcalloc(c->pool, sizeof(*ctx));
         net->out_ctx = (core_output_filter_ctx_t *)ctx;
-        rv = apr_socket_opt_set(net->client_socket, APR_SO_NONBLOCK, 1);
-        if (rv != APR_SUCCESS) {
-            return rv;
-        }
+        /*
+         * Need to create tmp brigade with correct lifetime. Passing
+         * NULL to apr_brigade_split_ex would result in a brigade
+         * allocated from bb->pool which might be wrong.
+         */
+        ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
+        /* same for buffered_bb and ap_save_brigade */
+        ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
     }
 
-    if (new_bb != NULL) {
-        for (bucket = APR_BRIGADE_FIRST(new_bb); bucket != APR_BRIGADE_SENTINEL(new_bb); bucket = APR_BUCKET_NEXT(bucket)) {
-            if (bucket->length > 0) {
-                ctx->bytes_in += bucket->length;
-            }
-        }
-    }
+    if (new_bb != NULL)
+        bb = new_bb;
 
     if ((ctx->buffered_bb != NULL) &&
         !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
-        bb = ctx->buffered_bb;
-        ctx->buffered_bb = NULL;
         if (new_bb != NULL) {
-            APR_BRIGADE_CONCAT(bb, new_bb);
+            APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
+        }
+        else {
+            bb = ctx->buffered_bb;
         }
         c->data_in_output_filters = 0;
     }
-    else if (new_bb != NULL) {
-        bb = new_bb;
-    }
-    else {
+    else if (new_bb == NULL) {
         return APR_SUCCESS;
     }
 
     /* Scan through the brigade and decide whether to attempt a write,
-     * based on the following rules:
+     * and how much to write, based on the following rules:
      *
      *  1) The new_bb is null: Do a nonblocking write of as much as
      *     possible: do a nonblocking write of as much data as possible,
@@ -396,10 +428,13 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
      *     completion and has just determined that this connection
      *     is writable.)
      *
-     *  2) The brigade contains a flush bucket: Do a blocking write
+     *  2) Determine if and up to which bucket we need to do a blocking
+     *     write:
+     *
+     *  a) The brigade contains a flush bucket: Do a blocking write
      *     of everything up that point.
      *
-     *  3) The request is in CONN_STATE_HANLDER state, and the brigade
+     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
      *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
      *     buckets: Do blocking writes until the amount of data in the
      *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
@@ -407,104 +442,164 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
      *     streaming out lots of data faster than the data can be
      *     sent to the client.)
      *
-     *  4) The brigade contains at least THRESHOLD_MIN_WRITE
+     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
+     *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
+     *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
+     *     buckets are left. (The point of this rule is to prevent too many
+     *     FDs being kept open by pipelined requests, possibly allowing a
+     *     DoS).
+     *
+     *  d) The brigade contains a morphing bucket: If there was no other
+     *     reason to do a blocking write yet, try reading the bucket. If its
+     *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+     *     everything is fine. Otherwise we need to do a blocking write the
+     *     up to and including the morphing bucket, because ap_save_brigade()
+     *     would read the whole bucket into memory later on.
+     *
+     *  3) Actually do the blocking write up to the last bucket determined
+     *     by rules 2a-d. The point of doing only one flush is to make as
+     *     few calls to writev() as possible.
+     *
+     *  4) If the brigade contains at least THRESHOLD_MIN_WRITE
      *     bytes: Do a nonblocking write of as much data as possible,
      *     then save the rest in ctx->buffered_bb.
      */
 
     if (new_bb == NULL) {
-        apr_status_t rv = send_brigade_nonblocking(net->client_socket, bb,
-                                                   &(ctx->bytes_written), c);
+        rv = send_brigade_nonblocking(net->client_socket, bb,
+                                      &(ctx->bytes_written), c);
         if (APR_STATUS_IS_EAGAIN(rv)) {
             rv = APR_SUCCESS;
         }
-        setaside_remaining_output(f, ctx, bb, 0, c);
+        else if (rv != APR_SUCCESS) {
+            /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
+            c->aborted = 1;
+        }
+        setaside_remaining_output(f, ctx, bb, c);
         return rv;
     }
 
     bytes_in_brigade = 0;
     non_file_bytes_in_brigade = 0;
+    eor_buckets_in_brigade = 0;
+    morphing_bucket_in_brigade = 0;
+
     for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
          bucket = next) {
         next = APR_BUCKET_NEXT(bucket);
-        if (APR_BUCKET_IS_FLUSH(bucket)) {
-            apr_bucket_brigade *remainder = apr_brigade_split(bb, next);
-            apr_status_t rv = send_brigade_blocking(net->client_socket, bb,
-                                                    &(ctx->bytes_written), c);
-            if (rv != APR_SUCCESS) {
-                return rv;
+
+        if (!APR_BUCKET_IS_METADATA(bucket)) {
+            if (bucket->length == (apr_size_t)-1) {
+                /*
+                 * A setaside of morphing buckets would read everything into
+                 * memory. Instead, we will flush everything up to and
+                 * including this bucket.
+                 */
+                morphing_bucket_in_brigade = 1;
             }
-            bb = remainder;
-            next = APR_BRIGADE_FIRST(bb);
-            bytes_in_brigade = 0;
-            non_file_bytes_in_brigade = 0;
-        }
-        else if (!APR_BUCKET_IS_METADATA(bucket)) {
-            if (bucket->length < 0) {
-                const char *data;
-                apr_size_t length;
-                /* XXX support nonblocking read here? */
-                apr_status_t rv =
-                    apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
-                if (rv != APR_SUCCESS) {
-                    return rv;
-                }
-                /* reading may have split the bucket, so recompute next: */
-                next = APR_BUCKET_NEXT(bucket);
+            else {
+                bytes_in_brigade += bucket->length;
+                if (!APR_BUCKET_IS_FILE(bucket))
+                    non_file_bytes_in_brigade += bucket->length;
             }
-            bytes_in_brigade += bucket->length;
-            if (!APR_BUCKET_IS_FILE(bucket)) {
-                non_file_bytes_in_brigade += bucket->length;
+        }
+        else if (AP_BUCKET_IS_EOR(bucket)) {
+            eor_buckets_in_brigade++;
+        }
+
+        if (APR_BUCKET_IS_FLUSH(bucket)
+            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+            || morphing_bucket_in_brigade
+            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+            /* this segment of the brigade MUST be sent before returning. */
+
+            if (APLOGctrace6(c)) {
+                char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
+                               "FLUSH bucket" :
+                               (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
+                               "THRESHOLD_MAX_BUFFER" :
+                               morphing_bucket_in_brigade ? "morphing bucket" :
+                               "MAX_REQUESTS_IN_PIPELINE";
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
+                              "core_output_filter: flushing because of %s",
+                              reason);
             }
+            /*
+             * Defer the actual blocking write to avoid doing many writes.
+             */
+            flush_upto = next;
+
+            bytes_in_brigade = 0;
+            non_file_bytes_in_brigade = 0;
+            eor_buckets_in_brigade = 0;
+            morphing_bucket_in_brigade = 0;
         }
     }
 
-    if (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) {
-        /* ### Writing the entire brigade may be excessive; we really just
-         * ### need to send enough data to be under THRESHOLD_MAX_BUFFER.
-         */
-        apr_status_t rv = send_brigade_blocking(net->client_socket, bb,
-                                                &(ctx->bytes_written), c);
+    if (flush_upto != NULL) {
+        ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
+                                                 ctx->tmp_flush_bb);
+        rv = send_brigade_blocking(net->client_socket, bb,
+                                   &(ctx->bytes_written), c);
         if (rv != APR_SUCCESS) {
+            /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
+            c->aborted = 1;
             return rv;
         }
+        APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
     }
-    else if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
-        apr_status_t rv = send_brigade_nonblocking(net->client_socket, bb,
-                                                   &(ctx->bytes_written), c);
+
+    if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
+        rv = send_brigade_nonblocking(net->client_socket, bb,
+                                      &(ctx->bytes_written), c);
         if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
+            /* The client has aborted the connection */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
+                          "core_output_filter: writing data to the network");
+            c->aborted = 1;
             return rv;
         }
     }
 
-    setaside_remaining_output(f, ctx, bb, 1, c);
+    setaside_remaining_output(f, ctx, bb, c);
     return APR_SUCCESS;
 }
 
+/*
+ * This function assumes that either ctx->buffered_bb == NULL, or
+ * ctx->buffered_bb is empty, or ctx->buffered_bb == bb
+ */
 static void setaside_remaining_output(ap_filter_t *f,
                                       core_output_filter_ctx_t *ctx,
                                       apr_bucket_brigade *bb,
-                                      int make_a_copy, conn_rec *c)
+                                      conn_rec *c)
 {
     if (bb == NULL) {
         return;
     }
-    remove_empty_buckets(bb, c);
+    remove_empty_buckets(bb);
     if (!APR_BRIGADE_EMPTY(bb)) {
         c->data_in_output_filters = 1;
-        if (make_a_copy) {
-            /* XXX should this use a separate deferred write pool, like
-             * the original ap_core_output_filter?
-             */
-            ap_save_brigade(f, &(ctx->buffered_bb), &bb, c->pool);
-            apr_brigade_destroy(bb);
-        }
-        else {
-            ctx->buffered_bb = bb;
+        if (bb != ctx->buffered_bb) {
+            if (!ctx->deferred_write_pool) {
+                apr_pool_create(&ctx->deferred_write_pool, c->pool);
+                apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
+            }
+            ap_save_brigade(f, &(ctx->buffered_bb), &bb,
+                            ctx->deferred_write_pool);
+            apr_brigade_cleanup(bb);
         }
     }
-    else {
-        apr_brigade_destroy(bb);
+    else if (ctx->deferred_write_pool) {
+        /*
+         * There are no more requests in the pipeline. We can just clear the
+         * pool.
+         */
+        apr_pool_clear(ctx->deferred_write_pool);
     }
 }
 
@@ -528,12 +623,11 @@ static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
     struct iovec vec[MAX_IOVEC_TO_WRITE];
     apr_size_t nvec = 0;
 
-    remove_empty_buckets(bb, c);
+    remove_empty_buckets(bb);
 
     for (bucket = APR_BRIGADE_FIRST(bb);
          bucket != APR_BRIGADE_SENTINEL(bb);
          bucket = next) {
-        int did_sendfile = 0;
         next = APR_BUCKET_NEXT(bucket);
 #if APR_HAS_SENDFILE
         if (APR_BUCKET_IS_FILE(bucket)) {
@@ -547,7 +641,6 @@ static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
 
             if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
                 (bucket->length >= AP_MIN_SENDFILE_BYTES)) {
-                did_sendfile = 1;
                 if (nvec > 0) {
                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
@@ -557,7 +650,7 @@ static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
                         return rv;
                     }
                 }
-                rv = sendfile_nonblocking(s, bb, bytes_written, c);
+                rv = sendfile_nonblocking(s, bucket, bytes_written, c);
                 if (nvec > 0) {
                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
                 }
@@ -568,13 +661,30 @@ static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
             }
         }
 #endif /* APR_HAS_SENDFILE */
-        if (!did_sendfile && !APR_BUCKET_IS_METADATA(bucket)) {
+        /* didn't sendfile */
+        if (!APR_BUCKET_IS_METADATA(bucket)) {
             const char *data;
             apr_size_t length;
-            rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+            
+            /* Non-blocking read first, in case this is a morphing
+             * bucket type. */
+            rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
+            if (APR_STATUS_IS_EAGAIN(rv)) {
+                /* Read would block; flush any pending data and retry. */
+                if (nvec) {
+                    rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
+                    if (rv) {
+                        return rv;
+                    }
+                    nvec = 0;
+                }
+                
+                rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
+            }
             if (rv != APR_SUCCESS) {
                 return rv;
             }
+
             /* reading may have split the bucket, so recompute next: */
             next = APR_BUCKET_NEXT(bucket);
             vec[nvec].iov_base = (char *)data;
@@ -598,28 +708,17 @@ static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
         }
     }
 
-    remove_empty_buckets(bb, c);
+    remove_empty_buckets(bb);
 
     return APR_SUCCESS;
 }
 
-static void detect_error_bucket(apr_bucket *bucket, conn_rec *c)
-{
-    if (AP_BUCKET_IS_ERROR(bucket)
-        && (((ap_bucket_error *)(bucket->data))->status == HTTP_BAD_GATEWAY)) {
-        /* stream aborted and we have not ended it yet */
-        c->keepalive = AP_CONN_CLOSE;
-    }
-}
-
-static void remove_empty_buckets(apr_bucket_brigade *bb, conn_rec *c)
+static void remove_empty_buckets(apr_bucket_brigade *bb)
 {
     apr_bucket *bucket;
     while (((bucket = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) &&
            (APR_BUCKET_IS_METADATA(bucket) || (bucket->length == 0))) {
-        detect_error_bucket(bucket, c);
-        APR_BUCKET_REMOVE(bucket);
-        apr_bucket_destroy(bucket);
+        apr_bucket_delete(bucket);
     }
 }
 
@@ -645,7 +744,9 @@ static apr_status_t send_brigade_blocking(apr_socket_t *s,
                 pollset.reqevents = APR_POLLOUT;
                 pollset.desc.s = s;
                 apr_socket_timeout_get(s, &timeout);
-                rv = apr_poll(&pollset, 1, &nsds, timeout);
+                do {
+                    rv = apr_poll(&pollset, 1, &nsds, timeout);
+                } while (APR_STATUS_IS_EINTR(rv));
                 if (rv != APR_SUCCESS) {
                     break;
                 }
@@ -690,22 +791,18 @@ static apr_status_t writev_nonblocking(apr_socket_t *s,
             for (i = offset; i < nvec; ) {
                 apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
                 if (APR_BUCKET_IS_METADATA(bucket)) {
-                    detect_error_bucket(bucket, c);
-                    APR_BUCKET_REMOVE(bucket);
-                    apr_bucket_destroy(bucket);
+                    apr_bucket_delete(bucket);
                 }
                 else if (n >= vec[i].iov_len) {
-                    APR_BUCKET_REMOVE(bucket);
-                    apr_bucket_destroy(bucket);
+                    apr_bucket_delete(bucket);
                     offset++;
                     n -= vec[i++].iov_len;
                 }
                 else {
                     apr_bucket_split(bucket, n);
-                    APR_BUCKET_REMOVE(bucket);
-                    apr_bucket_destroy(bucket);
+                    apr_bucket_delete(bucket);
                     vec[i].iov_len -= n;
-                    vec[i].iov_base += n;
+                    vec[i].iov_base = (char *) vec[i].iov_base + n;
                     break;
                 }
             }
@@ -714,8 +811,8 @@ static apr_status_t writev_nonblocking(apr_socket_t *s,
             break;
         }
     }
-    if ((logio_add_bytes_out != NULL) && (bytes_written > 0)) {
-        logio_add_bytes_out(c, bytes_written);
+    if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
+        ap__logio_add_bytes_out(c, bytes_written);
     }
     *cumulative_bytes_written += bytes_written;
 
@@ -731,21 +828,21 @@ static apr_status_t writev_nonblocking(apr_socket_t *s,
 #if APR_HAS_SENDFILE
 
 static apr_status_t sendfile_nonblocking(apr_socket_t *s,
-                                         apr_bucket_brigade *bb,
+                                         apr_bucket *bucket,
                                          apr_size_t *cumulative_bytes_written,
                                          conn_rec *c)
 {
     apr_status_t rv = APR_SUCCESS;
-    apr_bucket *bucket;
     apr_bucket_file *file_bucket;
     apr_file_t *fd;
     apr_size_t file_length;
     apr_off_t file_offset;
     apr_size_t bytes_written = 0;
 
-    bucket = APR_BRIGADE_FIRST(bb);
     if (!APR_BUCKET_IS_FILE(bucket)) {
-        /* XXX log a "this should never happen" message */
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, c->base_server, APLOGNO(00006)
+                     "core_filter: sendfile_nonblocking: "
+                     "this should never happen");
         return APR_EGENERAL;
     }
     file_bucket = (apr_bucket_file *)(bucket->data);
@@ -776,18 +873,16 @@ static apr_status_t sendfile_nonblocking(apr_socket_t *s,
             rv = arv;
         }
     }
-    if ((logio_add_bytes_out != NULL) && (bytes_written > 0)) {
-        logio_add_bytes_out(c, bytes_written);
+    if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
+        ap__logio_add_bytes_out(c, bytes_written);
     }
     *cumulative_bytes_written += bytes_written;
     if ((bytes_written < file_length) && (bytes_written > 0)) {
         apr_bucket_split(bucket, bytes_written);
-        APR_BUCKET_REMOVE(bucket);
-        apr_bucket_destroy(bucket);
+        apr_bucket_delete(bucket);
     }
     else if (bytes_written == file_length) {
-        APR_BUCKET_REMOVE(bucket);
-        apr_bucket_destroy(bucket);
+        apr_bucket_delete(bucket);
     }
     return rv;
 }