]> granicus.if.org Git - apache/commitdiff
core: core output filter optimizations.
authorYann Ylavic <ylavic@apache.org>
Wed, 18 Jul 2018 21:55:29 +0000 (21:55 +0000)
committerYann Ylavic <ylavic@apache.org>
Wed, 18 Jul 2018 21:55:29 +0000 (21:55 +0000)
The core output filter used to determine first if it needed to block before
trying to send its data (including set aside ones), and if so it did call
send_brigade_blocking().

This can be avoided by making send_brigade_nonblocking() send as much data as
possible (nonblocking), and only if data remain check whether they should be
flushed (blocking), according to the same ap_filter_reinstate_brigade()
heuristics but afterward.

This allows both to simplify the code (axe send_brigade_blocking and some
duplicated logic) and optimize sends since send_brigade_nonblocking() is now
given all the buckets so it can make use of scatter/gather (iovec) or NOPUSH
option with the whole picture.

When sendfile is available and/or with fine tuning of FlushMaxThreshold (and
ReadBufferSize) from r1836032, one can now take advantage of modern network
speeds and bandwidth.

This commit also adds some APLOG_TRACE6 messages for outputed bytes (including
at mod_ssl level since splitting happens there when it's active).

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1836237 13f79535-47bb-0310-9956-ffa450edef68

modules/ssl/ssl_engine_io.c
server/core_filters.c
server/util_filter.c

index 1079d1598eaab463656085cfa8b2f7d4d41ad18c..82315833482645baf9887d2d8f7a13a78ce77edc 100644 (file)
@@ -153,6 +153,9 @@ static int bio_filter_out_flush(BIO *bio)
     bio_filter_out_ctx_t *outctx = (bio_filter_out_ctx_t *)BIO_get_data(bio);
     apr_bucket *e;
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, outctx->c,
+                  "bio_filter_out_write: flush");
+
     AP_DEBUG_ASSERT(APR_BRIGADE_EMPTY(outctx->bb));
 
     e = apr_bucket_flush_create(outctx->bb->bucket_alloc);
@@ -211,6 +214,9 @@ static int bio_filter_out_write(BIO *bio, const char *in, int inl)
         return -1;
     }
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, outctx->c,
+                  "bio_filter_out_write: %i bytes", inl);
+
     /* when handshaking we'll have a small number of bytes.
      * max size SSL will pass us here is about 16k.
      * (16413 bytes to be exact)
@@ -872,6 +878,9 @@ static apr_status_t ssl_filter_write(ap_filter_t *f,
         return APR_EGENERAL;
     }
 
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
+                  "ssl_filter_write: %"APR_SIZE_T_FMT" bytes", len);
+
     /* We rely on SSL_get_error() after the write, which requires an empty error
      * queue before the write in order to work properly.
      */
@@ -1670,8 +1679,11 @@ static apr_status_t ssl_io_filter_coalesce(ap_filter_t *f,
              && (ctx == NULL
                  || bytes + ctx->bytes + e->length < COALESCE_BYTES);
          e = APR_BUCKET_NEXT(e)) {
-        if (e->length) count++; /* don't count zero-length buckets */
-        bytes += e->length;
+        /* don't count zero-length buckets */
+        if (e->length) {
+            bytes += e->length;
+            count++;
+        }
     }
     upto = e;
 
index e0a38ddb37e5f7b38769515b9e933c4cbee04418..a3ae9bef3f6b007c31a8ca5a838c5c275e12fc77 100644 (file)
@@ -81,6 +81,8 @@ struct core_output_filter_ctx {
     apr_bucket_brigade *tmp_flush_bb;
     apr_bucket_brigade *empty_bb;
     apr_size_t bytes_written;
+    struct iovec *vec;
+    apr_size_t nvec;
 };
 
 struct core_filter_ctx {
@@ -328,26 +330,20 @@ apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
 
 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
                                              apr_bucket_brigade *bb,
-                                             apr_size_t *bytes_written,
+                                             core_output_filter_ctx_t *ctx,
                                              conn_rec *c);
 
-static void remove_empty_buckets(apr_bucket_brigade *bb);
-
-static apr_status_t send_brigade_blocking(apr_socket_t *s,
-                                          apr_bucket_brigade *bb,
-                                          apr_size_t *bytes_written,
-                                          conn_rec *c);
-
 static apr_status_t writev_nonblocking(apr_socket_t *s,
-                                       struct iovec *vec, apr_size_t nvec,
                                        apr_bucket_brigade *bb,
-                                       apr_size_t *cumulative_bytes_written,
+                                       core_output_filter_ctx_t *ctx,
+                                       apr_size_t bytes_to_write,
+                                       apr_size_t nvec,
                                        conn_rec *c);
 
 #if APR_HAS_SENDFILE
 static apr_status_t sendfile_nonblocking(apr_socket_t *s,
                                          apr_bucket *bucket,
-                                         apr_size_t *cumulative_bytes_written,
+                                         core_output_filter_ctx_t *ctx,
                                          conn_rec *c);
 #endif
 
@@ -360,10 +356,10 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
 {
     conn_rec *c = f->c;
     core_net_rec *net = f->ctx;
+    apr_socket_t *sock = net->client_socket;
     core_output_filter_ctx_t *ctx = net->out_ctx;
-    apr_bucket *flush_upto = NULL;
+    apr_interval_time_t sock_timeout = 0;
     apr_status_t rv;
-    int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
 
     /* Fail quickly if the connection has already been aborted. */
     if (c->aborted) {
@@ -390,83 +386,45 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
         bb = ctx->empty_bb;
     }
 
-    /* Scan through the brigade and decide whether to attempt a write,
-     * and how much to write, based on the following rules:
-     *
-     *  1) The bb is empty: Do a nonblocking write of as much as
-     *     possible: do a nonblocking write of as much data as possible,
-     *     then save the rest in ctx->buffered_bb.  (If bb is empty,
-     *     it probably means that the MPM is doing asynchronous write
-     *     completion and has just determined that this connection
-     *     is writable.)
-     *
-     *  2) Determine if and up to which bucket we need to do a blocking
-     *     write:
-     *
-     *  a) The brigade contains a flush bucket: Do a blocking write
-     *     of everything up that point.
-     *
-     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
-     *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
-     *     buckets: Do blocking writes until the amount of data in the
-     *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
-     *     rule is to provide flow control, in case a handler is
-     *     streaming out lots of data faster than the data can be
-     *     sent to the client.)
-     *
-     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
-     *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
-     *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
-     *     buckets are left. (The point of this rule is to prevent too many
-     *     FDs being kept open by pipelined requests, possibly allowing a
-     *     DoS).
-     *
-     *  d) The brigade contains a morphing bucket: If there was no other
-     *     reason to do a blocking write yet, try reading the bucket. If its
-     *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
-     *     everything is fine. Otherwise we need to do a blocking write the
-     *     up to and including the morphing bucket, because ap_save_brigade()
-     *     would read the whole bucket into memory later on.
-     *
-     *  3) Actually do the blocking write up to the last bucket determined
-     *     by rules 2a-d. The point of doing only one flush is to make as
-     *     few calls to writev() as possible.
-     */
-
-    ap_filter_reinstate_brigade(f, bb, &flush_upto);
-
+    /* Prepend buckets set aside, if any. */
+    ap_filter_reinstate_brigade(f, bb, NULL);
     if (APR_BRIGADE_EMPTY(bb)) {
         return APR_SUCCESS;
     }
 
-    if (flush_upto != NULL) {
-        ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
-                                                 ctx->tmp_flush_bb);
-        if (loglevel >= APLOG_TRACE8) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
-                              "flushing now");
-        }
-        rv = send_brigade_blocking(net->client_socket, bb,
-                                   &(ctx->bytes_written), c);
-        if (rv != APR_SUCCESS) {
-            /* The client has aborted the connection */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
-                          "core_output_filter: writing data to the network");
-            apr_brigade_cleanup(bb);
-            c->aborted = 1;
-            return rv;
-        }
-        if (loglevel >= APLOG_TRACE8) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
-                              "total bytes written: %" APR_SIZE_T_FMT,
-                              ctx->bytes_written);
+    /* Non-blocking writes on the socket in any case. */
+    apr_socket_timeout_get(sock, &sock_timeout);
+    apr_socket_timeout_set(sock, 0);
+
+    do {
+        rv = send_brigade_nonblocking(sock, bb, ctx, c);
+        if (APR_STATUS_IS_EAGAIN(rv)) {
+            /* Scan through the brigade and decide whether we must absolutely
+             * flush the remaining data, based on ap_filter_reinstate_brigade()
+             * rules. If so, wait for writability and retry, otherwise we did
+             * our best already and can wait for the next call.
+             */
+            apr_bucket *flush_upto;
+            ap_filter_reinstate_brigade(f, bb, &flush_upto);
+            if (flush_upto) {
+                apr_int32_t nfd;
+                apr_pollfd_t pfd;
+                memset(&pfd, 0, sizeof(pfd));
+                pfd.reqevents = APR_POLLOUT;
+                pfd.desc_type = APR_POLL_SOCKET;
+                pfd.desc.s = sock;
+                pfd.p = c->pool;
+                do {
+                    rv = apr_poll(&pfd, 1, &nfd, sock_timeout);
+                } while (APR_STATUS_IS_EINTR(rv));
+            }
         }
-        APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
-    }
+    } while (rv == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb));
 
-    rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written),
-            c);
-    if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
+    /* Restore original socket timeout before leaving. */
+    apr_socket_timeout_set(sock, sock_timeout);
+
+    if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
         /* The client has aborted the connection */
         ap_log_cerror(
                 APLOG_MARK, APLOG_TRACE1, rv, c,
@@ -475,299 +433,267 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
         c->aborted = 1;
         return rv;
     }
-    if (loglevel >= APLOG_TRACE8) {
-        ap_log_cerror(
-                APLOG_MARK, APLOG_TRACE8, 0, c,
-                "tried nonblocking write, total bytes "
-                "written: %" APR_SIZE_T_FMT, ctx->bytes_written);
-    }
-
-    remove_empty_buckets(bb);
-    ap_filter_setaside_brigade(f, bb);
 
-    return APR_SUCCESS;
+    return ap_filter_setaside_brigade(f, bb);
 }
 
 #ifndef APR_MAX_IOVEC_SIZE
-#define MAX_IOVEC_TO_WRITE 16
+#define NVEC_MIN 16
+#define NVEC_MAX NVEC_MIN
 #else
 #if APR_MAX_IOVEC_SIZE > 16
-#define MAX_IOVEC_TO_WRITE 16
+#define NVEC_MIN 16
 #else
-#define MAX_IOVEC_TO_WRITE APR_MAX_IOVEC_SIZE
+#define NVEC_MIN APR_MAX_IOVEC_SIZE
+#endif
+#define NVEC_MAX APR_MAX_IOVEC_SIZE
 #endif
+
+static APR_INLINE int is_in_memory_bucket(apr_bucket *b)
+{
+    /* The bucket data are already in memory unless:
+     *   - it's a morphing bucket (heap buffers allocated on read), or
+     *   - it's a file bucket (heap buffers also allocated on read), or
+     *   - it's a mmap bucket (mapping happens over memory access usually).
+     */
+    return b->length != (apr_size_t)-1
+           && !APR_BUCKET_IS_FILE(b)
+           && !APR_BUCKET_IS_MMAP(b);
+}
+
+#if APR_HAS_SENDFILE
+static APR_INLINE int can_sendfile_bucket(apr_bucket *b)
+{
+    /* Use sendfile to send the bucket unless:
+     *   - the bucket is not a file bucket, or
+     *   - the file is too small for sendfile to be useful, or
+     *   - sendfile is disabled in the httpd config via "EnableSendfile off".
+     */
+    if (APR_BUCKET_IS_FILE(b) && b->length >= AP_MIN_SENDFILE_BYTES) {
+        apr_file_t *file = ((apr_bucket_file *)b->data)->fd;
+        return apr_file_flags_get(file) & APR_SENDFILE_ENABLED;
+    }
+    else {
+        return 0;
+    }
+}
 #endif
 
 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
                                              apr_bucket_brigade *bb,
-                                             apr_size_t *bytes_written,
+                                             core_output_filter_ctx_t *ctx,
                                              conn_rec *c)
 {
+    apr_status_t rv = APR_SUCCESS;
+    core_server_config *conf =
+        ap_get_core_module_config(c->base_server->module_config);
+    apr_size_t nvec = 0, nbytes = 0;
     apr_bucket *bucket, *next;
-    apr_status_t rv;
-    struct iovec vec[MAX_IOVEC_TO_WRITE];
-    apr_size_t nvec = 0;
-
-    remove_empty_buckets(bb);
+    const char *data;
+    apr_size_t length;
 
     for (bucket = APR_BRIGADE_FIRST(bb);
          bucket != APR_BRIGADE_SENTINEL(bb);
          bucket = next) {
         next = APR_BUCKET_NEXT(bucket);
-#if APR_HAS_SENDFILE
-        if (APR_BUCKET_IS_FILE(bucket)) {
-            apr_bucket_file *file_bucket = (apr_bucket_file *)(bucket->data);
-            apr_file_t *fd = file_bucket->fd;
-            /* Use sendfile to send this file unless:
-             *   - the platform doesn't support sendfile,
-             *   - the file is too small for sendfile to be useful, or
-             *   - sendfile is disabled in the httpd config via "EnableSendfile off"
-             */
 
-            if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
-                (bucket->length >= AP_MIN_SENDFILE_BYTES)) {
-                if (nvec > 0) {
-                    (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
-                    rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
-                    if (rv != APR_SUCCESS) {
-                        (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
-                        return rv;
-                    }
-                }
-                rv = sendfile_nonblocking(s, bucket, bytes_written, c);
-                if (nvec > 0) {
-                    (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
-                    nvec = 0;
-                }
+#if APR_HAS_SENDFILE
+        if (can_sendfile_bucket(bucket)) {
+            if (nvec > 0) {
+                (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+                rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
                 if (rv != APR_SUCCESS) {
-                    return rv;
+                    goto cleanup;
                 }
-                break;
+                nbytes = 0;
+                nvec = 0;
             }
+            rv = sendfile_nonblocking(s, bucket, ctx, c);
+            if (rv != APR_SUCCESS) {
+                goto cleanup;
+            }
+            continue;
         }
 #endif /* APR_HAS_SENDFILE */
-        /* didn't sendfile */
-        if (!APR_BUCKET_IS_METADATA(bucket)) {
-            const char *data;
-            apr_size_t length;
-            
+
+        if (bucket->length) {
             /* Non-blocking read first, in case this is a morphing
              * bucket type. */
             rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
             if (APR_STATUS_IS_EAGAIN(rv)) {
                 /* Read would block; flush any pending data and retry. */
                 if (nvec) {
-                    rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
-                    if (rv) {
-                        return rv;
+                    rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
+                    if (rv != APR_SUCCESS) {
+                        goto cleanup;
                     }
+                    nbytes = 0;
                     nvec = 0;
                 }
-                
+                (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
+
                 rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
             }
             if (rv != APR_SUCCESS) {
-                return rv;
+                goto cleanup;
             }
 
             /* reading may have split the bucket, so recompute next: */
             next = APR_BUCKET_NEXT(bucket);
-            vec[nvec].iov_base = (char *)data;
-            vec[nvec].iov_len = length;
-            nvec++;
-            if (nvec == MAX_IOVEC_TO_WRITE) {
-                rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
-                nvec = 0;
-                if (rv != APR_SUCCESS) {
-                    return rv;
-                }
-                break;
-            }
         }
-    }
 
-    if (nvec > 0) {
-        rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
-        if (rv != APR_SUCCESS) {
-            return rv;
+        if (!bucket->length) {
+            /* Don't delete empty buckets until all the previous ones have been
+             * sent (nvec == 0); this must happen in sequence since metabuckets
+             * like EOR could free the data still pointed to by the iovec. So
+             * unless the latter is empty, let writev_nonblocking() cleanup the
+             * brigade in order.
+             */
+            if (!nvec) {
+                apr_bucket_delete(bucket);
+            }
+            continue;
         }
-    }
-
-    remove_empty_buckets(bb);
-
-    return APR_SUCCESS;
-}
-
-static void remove_empty_buckets(apr_bucket_brigade *bb)
-{
-    apr_bucket *bucket;
-    while (((bucket = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) &&
-           (APR_BUCKET_IS_METADATA(bucket) || (bucket->length == 0))) {
-        apr_bucket_delete(bucket);
-    }
-}
-
-static apr_status_t send_brigade_blocking(apr_socket_t *s,
-                                          apr_bucket_brigade *bb,
-                                          apr_size_t *bytes_written,
-                                          conn_rec *c)
-{
-    apr_status_t rv;
 
-    rv = APR_SUCCESS;
-    while (!APR_BRIGADE_EMPTY(bb)) {
-        rv = send_brigade_nonblocking(s, bb, bytes_written, c);
-        if (rv != APR_SUCCESS) {
-            if (APR_STATUS_IS_EAGAIN(rv)) {
-                /* Wait until we can send more data */
-                apr_int32_t nsds;
-                apr_interval_time_t timeout;
-                apr_pollfd_t pollset;
-
-                pollset.p = c->pool;
-                pollset.desc_type = APR_POLL_SOCKET;
-                pollset.reqevents = APR_POLLOUT;
-                pollset.desc.s = s;
-                apr_socket_timeout_get(s, &timeout);
-                do {
-                    rv = apr_poll(&pollset, 1, &nsds, timeout);
-                } while (APR_STATUS_IS_EINTR(rv));
+        /* Make sure that these new data fit in our iovec. */
+        if (nvec == ctx->nvec) {
+            if (nvec == NVEC_MAX) {
+                (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+                rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
                 if (rv != APR_SUCCESS) {
-                    break;
+                    goto cleanup;
                 }
+                nbytes = 0;
+                nvec = 0;
             }
             else {
-                break;
+                struct iovec *newvec;
+                apr_size_t newn = nvec * 2;
+                if (newn < NVEC_MIN) {
+                    newn = NVEC_MIN;
+                }
+                else if (newn > NVEC_MAX) {
+                    newn = NVEC_MAX;
+                }
+                newvec = apr_palloc(c->pool, newn * sizeof(struct iovec));
+                if (nvec) {
+                    memcpy(newvec, ctx->vec, nvec * sizeof(struct iovec));
+                }
+                ctx->vec = newvec;
+                ctx->nvec = newn;
             }
         }
+        nbytes += length;
+        ctx->vec[nvec].iov_base = (void *)data;
+        ctx->vec[nvec].iov_len = length;
+        nvec++;
+
+        /* Flush above max threshold, unless the brigade still contains in
+         * memory buckets which we want to try writing in the same pass (if
+         * we are at the end of the brigade, the write will happen outside
+         * the loop anyway).
+         */
+        if (nbytes >= conf->flush_max_threshold
+                && next != APR_BRIGADE_SENTINEL(bb)
+                && !is_in_memory_bucket(next)) {
+            (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
+            rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
+            if (rv != APR_SUCCESS) {
+                goto cleanup;
+            }
+            nbytes = 0;
+            nvec = 0;
+        }
+    }
+    if (nvec > 0) {
+        rv = writev_nonblocking(s, bb, ctx, nbytes, nvec, c);
     }
+
+cleanup:
+    (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
     return rv;
 }
 
 static apr_status_t writev_nonblocking(apr_socket_t *s,
-                                       struct iovec *vec, apr_size_t nvec,
                                        apr_bucket_brigade *bb,
-                                       apr_size_t *cumulative_bytes_written,
+                                       core_output_filter_ctx_t *ctx,
+                                       apr_size_t bytes_to_write,
+                                       apr_size_t nvec,
                                        conn_rec *c)
 {
-    apr_status_t rv = APR_SUCCESS, arv;
-    apr_size_t bytes_written = 0, bytes_to_write = 0;
-    apr_size_t i, offset;
-    apr_interval_time_t old_timeout;
-
-    arv = apr_socket_timeout_get(s, &old_timeout);
-    if (arv != APR_SUCCESS) {
-        return arv;
-    }
-    arv = apr_socket_timeout_set(s, 0);
-    if (arv != APR_SUCCESS) {
-        return arv;
-    }
+    apr_status_t rv;
+    struct iovec *vec = ctx->vec;
+    apr_size_t bytes_written = 0;
+    apr_size_t i, offset = 0;
 
-    for (i = 0; i < nvec; i++) {
-        bytes_to_write += vec[i].iov_len;
-    }
-    offset = 0;
-    while (bytes_written < bytes_to_write) {
+    do {
         apr_size_t n = 0;
         rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n);
-        if (n > 0) {
-            bytes_written += n;
-            for (i = offset; i < nvec; ) {
-                apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
-                if (APR_BUCKET_IS_METADATA(bucket)) {
-                    apr_bucket_delete(bucket);
-                }
-                else if (n >= vec[i].iov_len) {
-                    apr_bucket_delete(bucket);
-                    offset++;
-                    n -= vec[i++].iov_len;
-                }
-                else {
+        bytes_written += n;
+
+        for (i = offset; i < nvec; ) {
+            apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
+            if (!bucket->length) {
+                apr_bucket_delete(bucket);
+            }
+            else if (n >= vec[i].iov_len) {
+                apr_bucket_delete(bucket);
+                n -= vec[i++].iov_len;
+                offset++;
+            }
+            else {
+                if (n) {
                     apr_bucket_split(bucket, n);
                     apr_bucket_delete(bucket);
                     vec[i].iov_len -= n;
                     vec[i].iov_base = (char *) vec[i].iov_base + n;
-                    break;
                 }
+                break;
             }
         }
-        if (rv != APR_SUCCESS) {
-            break;
-        }
-    }
+    } while (rv == APR_SUCCESS && bytes_written < bytes_to_write);
+
     if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
         ap__logio_add_bytes_out(c, bytes_written);
     }
-    *cumulative_bytes_written += bytes_written;
+    ctx->bytes_written += bytes_written;
 
-    arv = apr_socket_timeout_set(s, old_timeout);
-    if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
-        return arv;
-    }
-    else {
-        return rv;
-    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, rv, c,
+                  "writev_nonblocking: %"APR_SIZE_T_FMT"/%"APR_SIZE_T_FMT,
+                  bytes_written, bytes_to_write);
+    return rv;
 }
 
 #if APR_HAS_SENDFILE
 
 static apr_status_t sendfile_nonblocking(apr_socket_t *s,
                                          apr_bucket *bucket,
-                                         apr_size_t *cumulative_bytes_written,
+                                         core_output_filter_ctx_t *ctx,
                                          conn_rec *c)
 {
-    apr_status_t rv = APR_SUCCESS;
-    apr_bucket_file *file_bucket;
-    apr_file_t *fd;
-    apr_size_t file_length;
-    apr_off_t file_offset;
-    apr_size_t bytes_written = 0;
+    apr_status_t rv;
+    apr_file_t *file = ((apr_bucket_file *)bucket->data)->fd;
+    apr_size_t bytes_written = bucket->length; /* bytes_to_write for now */
+    apr_off_t file_offset = bucket->start;
 
-    if (!APR_BUCKET_IS_FILE(bucket)) {
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, c->base_server, APLOGNO(00006)
-                     "core_filter: sendfile_nonblocking: "
-                     "this should never happen");
-        return APR_EGENERAL;
-    }
-    file_bucket = (apr_bucket_file *)(bucket->data);
-    fd = file_bucket->fd;
-    file_length = bucket->length;
-    file_offset = bucket->start;
-
-    if (bytes_written < file_length) {
-        apr_size_t n = file_length - bytes_written;
-        apr_status_t arv;
-        apr_interval_time_t old_timeout;
-
-        arv = apr_socket_timeout_get(s, &old_timeout);
-        if (arv != APR_SUCCESS) {
-            return arv;
-        }
-        arv = apr_socket_timeout_set(s, 0);
-        if (arv != APR_SUCCESS) {
-            return arv;
-        }
-        rv = apr_socket_sendfile(s, fd, NULL, &file_offset, &n, 0);
-        if (rv == APR_SUCCESS) {
-            bytes_written += n;
-            file_offset += n;
-        }
-        arv = apr_socket_timeout_set(s, old_timeout);
-        if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
-            rv = arv;
-        }
-    }
+    rv = apr_socket_sendfile(s, file, NULL, &file_offset, &bytes_written, 0);
     if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
         ap__logio_add_bytes_out(c, bytes_written);
     }
-    *cumulative_bytes_written += bytes_written;
-    if ((bytes_written < file_length) && (bytes_written > 0)) {
-        apr_bucket_split(bucket, bytes_written);
+    ctx->bytes_written += bytes_written;
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, rv, c,
+                  "sendfile_nonblocking: %" APR_SIZE_T_FMT "/%" APR_SIZE_T_FMT,
+                  bytes_written, bucket->length);
+    if (bytes_written >= bucket->length) {
         apr_bucket_delete(bucket);
     }
-    else if (bytes_written == file_length) {
+    else if (bytes_written > 0) {
+        apr_bucket_split(bucket, bytes_written);
         apr_bucket_delete(bucket);
+        if (rv == APR_SUCCESS) {
+            rv = APR_EAGAIN;
+        }
     }
     return rv;
 }
index 04c637a8a15a12fec4975e43abd73faa4e6adde0..470547c37eb90c4564fa1126ae36be079e886e1e 100644 (file)
@@ -834,6 +834,12 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
     if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
         APR_BRIGADE_PREPEND(bb, f->bb);
     }
+    if (!flush_upto) {
+        /* Just prepend all. */
+        return APR_SUCCESS;
+    }
+    *flush_upto = NULL;
 
     /*
      * Determine if and up to which bucket we need to do a blocking write:
@@ -865,8 +871,6 @@ AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
      *     would read the whole bucket into memory later on.
      */
 
-    *flush_upto = NULL;
-
     bytes_in_brigade = 0;
     non_file_bytes_in_brigade = 0;
     eor_buckets_in_brigade = 0;