]> granicus.if.org Git - apache/blob - server/core_filters.c
core_filters: restore/disable TCP_NOPUSH option after non-blocking sendfile.
[apache] / server / core_filters.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 /**
18  * @file  core_filters.c
19  * @brief Core input/output network filters.
20  */
21
22 #include "apr.h"
23 #include "apr_strings.h"
24 #include "apr_lib.h"
25 #include "apr_fnmatch.h"
26 #include "apr_hash.h"
27 #include "apr_thread_proc.h"    /* for RLIMIT stuff */
28
29 #define APR_WANT_IOVEC
30 #define APR_WANT_STRFUNC
31 #define APR_WANT_MEMFUNC
32 #include "apr_want.h"
33
34 #include "ap_config.h"
35 #include "httpd.h"
36 #include "http_config.h"
37 #include "http_core.h"
38 #include "http_protocol.h" /* For index_of_response().  Grump. */
39 #include "http_request.h"
40 #include "http_vhost.h"
41 #include "http_main.h"     /* For the default_handler below... */
42 #include "http_log.h"
43 #include "util_md5.h"
44 #include "http_connection.h"
45 #include "apr_buckets.h"
46 #include "util_filter.h"
47 #include "util_ebcdic.h"
48 #include "mpm_common.h"
49 #include "scoreboard.h"
50 #include "mod_core.h"
51 #include "ap_listen.h"
52
53 #include "mod_so.h" /* for ap_find_loaded_module_symbol */
54
55 #define AP_MIN_SENDFILE_BYTES           (256)
56
57 /**
58  * Remove all zero length buckets from the brigade.
59  */
60 #define BRIGADE_NORMALIZE(b) \
61 do { \
62     apr_bucket *e = APR_BRIGADE_FIRST(b); \
63     do {  \
64         if (e->length == 0 && !APR_BUCKET_IS_METADATA(e)) { \
65             apr_bucket *d; \
66             d = APR_BUCKET_NEXT(e); \
67             apr_bucket_delete(e); \
68             e = d; \
69         } \
70         else { \
71             e = APR_BUCKET_NEXT(e); \
72         } \
73     } while (!APR_BRIGADE_EMPTY(b) && (e != APR_BRIGADE_SENTINEL(b))); \
74 } while (0)
75
76 /* we know core's module_index is 0 */
77 #undef APLOG_MODULE_INDEX
78 #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
79
80 struct core_output_filter_ctx {
81     apr_bucket_brigade *buffered_bb;
82     apr_bucket_brigade *tmp_flush_bb;
83     apr_pool_t *deferred_write_pool;
84     apr_size_t bytes_written;
85 };
86
87 struct core_filter_ctx {
88     apr_bucket_brigade *b;
89     apr_bucket_brigade *tmpbb;
90 };
91
92
93 apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
94                                   ap_input_mode_t mode, apr_read_type_e block,
95                                   apr_off_t readbytes)
96 {
97     apr_status_t rv;
98     core_net_rec *net = f->ctx;
99     core_ctx_t *ctx = net->in_ctx;
100     const char *str;
101     apr_size_t len;
102
103     if (mode == AP_MODE_INIT) {
104         /*
105          * this mode is for filters that might need to 'initialize'
106          * a connection before reading request data from a client.
107          * NNTP over SSL for example needs to handshake before the
108          * server sends the welcome message.
109          * such filters would have changed the mode before this point
110          * is reached.  however, protocol modules such as NNTP should
111          * not need to know anything about SSL.  given the example, if
112          * SSL is not in the filter chain, AP_MODE_INIT is a noop.
113          */
114         return APR_SUCCESS;
115     }
116
117     if (!ctx)
118     {
119         net->in_ctx = ctx = apr_palloc(f->c->pool, sizeof(*ctx));
120         ctx->b = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
121         ctx->tmpbb = apr_brigade_create(f->c->pool, f->c->bucket_alloc);
122         /* seed the brigade with the client socket. */
123         rv = ap_run_insert_network_bucket(f->c, ctx->b, net->client_socket);
124         if (rv != APR_SUCCESS)
125             return rv;
126     }
127     else if (APR_BRIGADE_EMPTY(ctx->b)) {
128         return APR_EOF;
129     }
130
131     /* ### This is bad. */
132     BRIGADE_NORMALIZE(ctx->b);
133
134     /* check for empty brigade again *AFTER* BRIGADE_NORMALIZE()
135      * If we have lost our socket bucket (see above), we are EOF.
136      *
137      * Ideally, this should be returning SUCCESS with EOS bucket, but
138      * some higher-up APIs (spec. read_request_line via ap_rgetline)
139      * want an error code. */
140     if (APR_BRIGADE_EMPTY(ctx->b)) {
141         return APR_EOF;
142     }
143
144     if (mode == AP_MODE_GETLINE) {
145         /* we are reading a single LF line, e.g. the HTTP headers */
146         rv = apr_brigade_split_line(b, ctx->b, block, HUGE_STRING_LEN);
147         /* We should treat EAGAIN here the same as we do for EOF (brigade is
148          * empty).  We do this by returning whatever we have read.  This may
149          * or may not be bogus, but is consistent (for now) with EOF logic.
150          */
151         if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
152             rv = APR_SUCCESS;
153         }
154         return rv;
155     }
156
157     /* ### AP_MODE_PEEK is a horrific name for this mode because we also
158      * eat any CRLFs that we see.  That's not the obvious intention of
159      * this mode.  Determine whether anyone actually uses this or not. */
160     if (mode == AP_MODE_EATCRLF) {
161         apr_bucket *e;
162         const char *c;
163
164         /* The purpose of this loop is to ignore any CRLF (or LF) at the end
165          * of a request.  Many browsers send extra lines at the end of POST
166          * requests.  We use the PEEK method to determine if there is more
167          * data on the socket, so that we know if we should delay sending the
168          * end of one request until we have served the second request in a
169          * pipelined situation.  We don't want to actually delay sending a
170          * response if the server finds a CRLF (or LF), becuause that doesn't
171          * mean that there is another request, just a blank line.
172          */
173         while (1) {
174             if (APR_BRIGADE_EMPTY(ctx->b))
175                 return APR_EOF;
176
177             e = APR_BRIGADE_FIRST(ctx->b);
178
179             rv = apr_bucket_read(e, &str, &len, APR_NONBLOCK_READ);
180
181             if (rv != APR_SUCCESS)
182                 return rv;
183
184             c = str;
185             while (c < str + len) {
186                 if (*c == APR_ASCII_LF)
187                     c++;
188                 else if (*c == APR_ASCII_CR && *(c + 1) == APR_ASCII_LF)
189                     c += 2;
190                 else
191                     return APR_SUCCESS;
192             }
193
194             /* If we reach here, we were a bucket just full of CRLFs, so
195              * just toss the bucket. */
196             /* FIXME: Is this the right thing to do in the core? */
197             apr_bucket_delete(e);
198         }
199         return APR_SUCCESS;
200     }
201
202     /* If mode is EXHAUSTIVE, we want to just read everything until the end
203      * of the brigade, which in this case means the end of the socket.
204      * To do this, we attach the brigade that has currently been setaside to
205      * the brigade that was passed down, and send that brigade back.
206      *
207      * NOTE:  This is VERY dangerous to use, and should only be done with
208      * extreme caution.  FWLIW, this would be needed by an MPM like Perchild;
209      * such an MPM can easily request the socket and all data that has been
210      * read, which means that it can pass it to the correct child process.
211      */
212     if (mode == AP_MODE_EXHAUSTIVE) {
213         apr_bucket *e;
214
215         /* Tack on any buckets that were set aside. */
216         APR_BRIGADE_CONCAT(b, ctx->b);
217
218         /* Since we've just added all potential buckets (which will most
219          * likely simply be the socket bucket) we know this is the end,
220          * so tack on an EOS too. */
221         /* We have read until the brigade was empty, so we know that we
222          * must be EOS. */
223         e = apr_bucket_eos_create(f->c->bucket_alloc);
224         APR_BRIGADE_INSERT_TAIL(b, e);
225         return APR_SUCCESS;
226     }
227
228     /* read up to the amount they specified. */
229     if (mode == AP_MODE_READBYTES || mode == AP_MODE_SPECULATIVE) {
230         apr_bucket *e;
231
232         AP_DEBUG_ASSERT(readbytes > 0);
233
234         e = APR_BRIGADE_FIRST(ctx->b);
235         rv = apr_bucket_read(e, &str, &len, block);
236
237         if (APR_STATUS_IS_EAGAIN(rv) && block == APR_NONBLOCK_READ) {
238             /* getting EAGAIN for a blocking read is an error; for a
239              * non-blocking read, return an empty brigade. */
240             return APR_SUCCESS;
241         }
242         else if (rv != APR_SUCCESS) {
243             return rv;
244         }
245         else if (block == APR_BLOCK_READ && len == 0) {
246             /* We wanted to read some bytes in blocking mode.  We read
247              * 0 bytes.  Hence, we now assume we are EOS.
248              *
249              * When we are in normal mode, return an EOS bucket to the
250              * caller.
251              * When we are in speculative mode, leave ctx->b empty, so
252              * that the next call returns an EOS bucket.
253              */
254             apr_bucket_delete(e);
255
256             if (mode == AP_MODE_READBYTES) {
257                 e = apr_bucket_eos_create(f->c->bucket_alloc);
258                 APR_BRIGADE_INSERT_TAIL(b, e);
259             }
260             return APR_SUCCESS;
261         }
262
263         /* Have we read as much data as we wanted (be greedy)? */
264         if (len < readbytes) {
265             apr_size_t bucket_len;
266
267             rv = APR_SUCCESS;
268             /* We already registered the data in e in len */
269             e = APR_BUCKET_NEXT(e);
270             while ((len < readbytes) && (rv == APR_SUCCESS)
271                    && (e != APR_BRIGADE_SENTINEL(ctx->b))) {
272                 /* Check for the availability of buckets with known length */
273                 if (e->length != -1) {
274                     len += e->length;
275                     e = APR_BUCKET_NEXT(e);
276                 }
277                 else {
278                     /*
279                      * Read from bucket, but non blocking. If there isn't any
280                      * more data, well than this is fine as well, we will
281                      * not wait for more since we already got some and we are
282                      * only checking if there isn't more.
283                      */
284                     rv = apr_bucket_read(e, &str, &bucket_len,
285                                          APR_NONBLOCK_READ);
286                     if (rv == APR_SUCCESS) {
287                         len += bucket_len;
288                         e = APR_BUCKET_NEXT(e);
289                     }
290                 }
291             }
292         }
293
294         /* We can only return at most what we read. */
295         if (len < readbytes) {
296             readbytes = len;
297         }
298
299         rv = apr_brigade_partition(ctx->b, readbytes, &e);
300         if (rv != APR_SUCCESS) {
301             return rv;
302         }
303
304         /* Must do move before CONCAT */
305         ctx->tmpbb = apr_brigade_split_ex(ctx->b, e, ctx->tmpbb);
306
307         if (mode == AP_MODE_READBYTES) {
308             APR_BRIGADE_CONCAT(b, ctx->b);
309         }
310         else if (mode == AP_MODE_SPECULATIVE) {
311             apr_bucket *copy_bucket;
312
313             for (e = APR_BRIGADE_FIRST(ctx->b);
314                  e != APR_BRIGADE_SENTINEL(ctx->b);
315                  e = APR_BUCKET_NEXT(e))
316             {
317                 rv = apr_bucket_copy(e, &copy_bucket);
318                 if (rv != APR_SUCCESS) {
319                     return rv;
320                 }
321                 APR_BRIGADE_INSERT_TAIL(b, copy_bucket);
322             }
323         }
324
325         /* Take what was originally there and place it back on ctx->b */
326         APR_BRIGADE_CONCAT(ctx->b, ctx->tmpbb);
327     }
328     return APR_SUCCESS;
329 }
330
331 static void setaside_remaining_output(ap_filter_t *f,
332                                       core_output_filter_ctx_t *ctx,
333                                       apr_bucket_brigade *bb,
334                                       conn_rec *c);
335
336 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
337                                              apr_bucket_brigade *bb,
338                                              apr_size_t *bytes_written,
339                                              conn_rec *c);
340
341 static void remove_empty_buckets(apr_bucket_brigade *bb);
342
343 static apr_status_t send_brigade_blocking(apr_socket_t *s,
344                                           apr_bucket_brigade *bb,
345                                           apr_size_t *bytes_written,
346                                           conn_rec *c);
347
348 static apr_status_t writev_nonblocking(apr_socket_t *s,
349                                        struct iovec *vec, apr_size_t nvec,
350                                        apr_bucket_brigade *bb,
351                                        apr_size_t *cumulative_bytes_written,
352                                        conn_rec *c);
353
354 #if APR_HAS_SENDFILE
355 static apr_status_t sendfile_nonblocking(apr_socket_t *s,
356                                          apr_bucket *bucket,
357                                          apr_size_t *cumulative_bytes_written,
358                                          conn_rec *c);
359 #endif
360
361 /* XXX: Should these be configurable parameters? */
362 #define THRESHOLD_MIN_WRITE 4096
363 #define THRESHOLD_MAX_BUFFER 65536
364 #define MAX_REQUESTS_IN_PIPELINE 5
365
366 /* Optional function coming from mod_logio, used for logging of output
367  * traffic
368  */
369 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
370
371 apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
372 {
373     conn_rec *c = f->c;
374     core_net_rec *net = f->ctx;
375     core_output_filter_ctx_t *ctx = net->out_ctx;
376     apr_bucket_brigade *bb = NULL;
377     apr_bucket *bucket, *next, *flush_upto = NULL;
378     apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
379     int eor_buckets_in_brigade, morphing_bucket_in_brigade;
380     apr_status_t rv;
381     int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
382
383     /* Fail quickly if the connection has already been aborted. */
384     if (c->aborted) {
385         if (new_bb != NULL) {
386             apr_brigade_cleanup(new_bb);
387         }
388         return APR_ECONNABORTED;
389     }
390
391     if (ctx == NULL) {
392         ctx = apr_pcalloc(c->pool, sizeof(*ctx));
393         net->out_ctx = (core_output_filter_ctx_t *)ctx;
394         /*
395          * Need to create tmp brigade with correct lifetime. Passing
396          * NULL to apr_brigade_split_ex would result in a brigade
397          * allocated from bb->pool which might be wrong.
398          */
399         ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
400         /* same for buffered_bb and ap_save_brigade */
401         ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
402     }
403
404     if (new_bb != NULL)
405         bb = new_bb;
406
407     if ((ctx->buffered_bb != NULL) &&
408         !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
409         if (new_bb != NULL) {
410             APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
411         }
412         else {
413             bb = ctx->buffered_bb;
414         }
415         c->data_in_output_filters = 0;
416     }
417     else if (new_bb == NULL) {
418         return APR_SUCCESS;
419     }
420
421     /* Scan through the brigade and decide whether to attempt a write,
422      * and how much to write, based on the following rules:
423      *
424      *  1) The new_bb is null: Do a nonblocking write of as much as
425      *     possible: do a nonblocking write of as much data as possible,
426      *     then save the rest in ctx->buffered_bb.  (If new_bb == NULL,
427      *     it probably means that the MPM is doing asynchronous write
428      *     completion and has just determined that this connection
429      *     is writable.)
430      *
431      *  2) Determine if and up to which bucket we need to do a blocking
432      *     write:
433      *
434      *  a) The brigade contains a flush bucket: Do a blocking write
435      *     of everything up that point.
436      *
437      *  b) The request is in CONN_STATE_HANDLER state, and the brigade
438      *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
439      *     buckets: Do blocking writes until the amount of data in the
440      *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
441      *     rule is to provide flow control, in case a handler is
442      *     streaming out lots of data faster than the data can be
443      *     sent to the client.)
444      *
445      *  c) The request is in CONN_STATE_HANDLER state, and the brigade
446      *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
447      *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
448      *     buckets are left. (The point of this rule is to prevent too many
449      *     FDs being kept open by pipelined requests, possibly allowing a
450      *     DoS).
451      *
452      *  d) The brigade contains a morphing bucket: If there was no other
453      *     reason to do a blocking write yet, try reading the bucket. If its
454      *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
455      *     everything is fine. Otherwise we need to do a blocking write the
456      *     up to and including the morphing bucket, because ap_save_brigade()
457      *     would read the whole bucket into memory later on.
458      *
459      *  3) Actually do the blocking write up to the last bucket determined
460      *     by rules 2a-d. The point of doing only one flush is to make as
461      *     few calls to writev() as possible.
462      *
463      *  4) If the brigade contains at least THRESHOLD_MIN_WRITE
464      *     bytes: Do a nonblocking write of as much data as possible,
465      *     then save the rest in ctx->buffered_bb.
466      */
467
468     if (new_bb == NULL) {
469         rv = send_brigade_nonblocking(net->client_socket, bb,
470                                       &(ctx->bytes_written), c);
471         if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
472             /* The client has aborted the connection */
473             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
474                           "core_output_filter: writing data to the network");
475             apr_brigade_cleanup(bb);
476             c->aborted = 1;
477             return rv;
478         }
479         setaside_remaining_output(f, ctx, bb, c);
480         return APR_SUCCESS;
481     }
482
483     bytes_in_brigade = 0;
484     non_file_bytes_in_brigade = 0;
485     eor_buckets_in_brigade = 0;
486     morphing_bucket_in_brigade = 0;
487
488     for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
489          bucket = next) {
490         next = APR_BUCKET_NEXT(bucket);
491
492         if (!APR_BUCKET_IS_METADATA(bucket)) {
493             if (bucket->length == (apr_size_t)-1) {
494                 /*
495                  * A setaside of morphing buckets would read everything into
496                  * memory. Instead, we will flush everything up to and
497                  * including this bucket.
498                  */
499                 morphing_bucket_in_brigade = 1;
500             }
501             else {
502                 bytes_in_brigade += bucket->length;
503                 if (!APR_BUCKET_IS_FILE(bucket))
504                     non_file_bytes_in_brigade += bucket->length;
505             }
506         }
507         else if (AP_BUCKET_IS_EOR(bucket)) {
508             eor_buckets_in_brigade++;
509         }
510
511         if (APR_BUCKET_IS_FLUSH(bucket)
512             || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
513             || morphing_bucket_in_brigade
514             || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
515             /* this segment of the brigade MUST be sent before returning. */
516
517             if (loglevel >= APLOG_TRACE6) {
518                 char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
519                                "FLUSH bucket" :
520                                (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
521                                "THRESHOLD_MAX_BUFFER" :
522                                morphing_bucket_in_brigade ? "morphing bucket" :
523                                "MAX_REQUESTS_IN_PIPELINE";
524                 ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
525                               "will flush because of %s", reason);
526                 ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
527                               "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
528                               ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
529                               "buckets: %d, morphing buckets: %d",
530                               flush_upto == NULL ? " so far"
531                                                  : " since last flush point",
532                               bytes_in_brigade,
533                               non_file_bytes_in_brigade,
534                               eor_buckets_in_brigade,
535                               morphing_bucket_in_brigade);
536             }
537             /*
538              * Defer the actual blocking write to avoid doing many writes.
539              */
540             flush_upto = next;
541
542             bytes_in_brigade = 0;
543             non_file_bytes_in_brigade = 0;
544             eor_buckets_in_brigade = 0;
545             morphing_bucket_in_brigade = 0;
546         }
547     }
548
549     if (flush_upto != NULL) {
550         ctx->tmp_flush_bb = apr_brigade_split_ex(bb, flush_upto,
551                                                  ctx->tmp_flush_bb);
552         if (loglevel >= APLOG_TRACE8) {
553                 ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
554                               "flushing now");
555         }
556         rv = send_brigade_blocking(net->client_socket, bb,
557                                    &(ctx->bytes_written), c);
558         if (rv != APR_SUCCESS) {
559             /* The client has aborted the connection */
560             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
561                           "core_output_filter: writing data to the network");
562             apr_brigade_cleanup(bb);
563             c->aborted = 1;
564             return rv;
565         }
566         if (loglevel >= APLOG_TRACE8) {
567                 ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
568                               "total bytes written: %" APR_SIZE_T_FMT,
569                               ctx->bytes_written);
570         }
571         APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
572     }
573
574     if (loglevel >= APLOG_TRACE8) {
575         ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
576                       "brigade contains: bytes: %" APR_SIZE_T_FMT
577                       ", non-file bytes: %" APR_SIZE_T_FMT
578                       ", eor buckets: %d, morphing buckets: %d",
579                       bytes_in_brigade, non_file_bytes_in_brigade,
580                       eor_buckets_in_brigade, morphing_bucket_in_brigade);
581     }
582
583     if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
584         rv = send_brigade_nonblocking(net->client_socket, bb,
585                                       &(ctx->bytes_written), c);
586         if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
587             /* The client has aborted the connection */
588             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
589                           "core_output_filter: writing data to the network");
590             apr_brigade_cleanup(bb);
591             c->aborted = 1;
592             return rv;
593         }
594         if (loglevel >= APLOG_TRACE8) {
595                 ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
596                               "tried nonblocking write, total bytes "
597                               "written: %" APR_SIZE_T_FMT,
598                               ctx->bytes_written);
599         }
600     }
601
602     setaside_remaining_output(f, ctx, bb, c);
603     return APR_SUCCESS;
604 }
605
606 /*
607  * This function assumes that either ctx->buffered_bb == NULL, or
608  * ctx->buffered_bb is empty, or ctx->buffered_bb == bb
609  */
610 static void setaside_remaining_output(ap_filter_t *f,
611                                       core_output_filter_ctx_t *ctx,
612                                       apr_bucket_brigade *bb,
613                                       conn_rec *c)
614 {
615     if (bb == NULL) {
616         return;
617     }
618     remove_empty_buckets(bb);
619     if (!APR_BRIGADE_EMPTY(bb)) {
620         c->data_in_output_filters = 1;
621         if (bb != ctx->buffered_bb) {
622             if (!ctx->deferred_write_pool) {
623                 apr_pool_create(&ctx->deferred_write_pool, c->pool);
624                 apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
625             }
626             ap_save_brigade(f, &(ctx->buffered_bb), &bb,
627                             ctx->deferred_write_pool);
628         }
629     }
630     else if (ctx->deferred_write_pool) {
631         /*
632          * There are no more requests in the pipeline. We can just clear the
633          * pool.
634          */
635         apr_pool_clear(ctx->deferred_write_pool);
636     }
637 }
638
639 #ifndef APR_MAX_IOVEC_SIZE
640 #define MAX_IOVEC_TO_WRITE 16
641 #else
642 #if APR_MAX_IOVEC_SIZE > 16
643 #define MAX_IOVEC_TO_WRITE 16
644 #else
645 #define MAX_IOVEC_TO_WRITE APR_MAX_IOVEC_SIZE
646 #endif
647 #endif
648
649 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
650                                              apr_bucket_brigade *bb,
651                                              apr_size_t *bytes_written,
652                                              conn_rec *c)
653 {
654     apr_bucket *bucket, *next;
655     apr_status_t rv;
656     struct iovec vec[MAX_IOVEC_TO_WRITE];
657     apr_size_t nvec = 0;
658
659     remove_empty_buckets(bb);
660
661     for (bucket = APR_BRIGADE_FIRST(bb);
662          bucket != APR_BRIGADE_SENTINEL(bb);
663          bucket = next) {
664         next = APR_BUCKET_NEXT(bucket);
665 #if APR_HAS_SENDFILE
666         if (APR_BUCKET_IS_FILE(bucket)) {
667             apr_bucket_file *file_bucket = (apr_bucket_file *)(bucket->data);
668             apr_file_t *fd = file_bucket->fd;
669             /* Use sendfile to send this file unless:
670              *   - the platform doesn't support sendfile,
671              *   - the file is too small for sendfile to be useful, or
672              *   - sendfile is disabled in the httpd config via "EnableSendfile off"
673              */
674
675             if ((apr_file_flags_get(fd) & APR_SENDFILE_ENABLED) &&
676                 (bucket->length >= AP_MIN_SENDFILE_BYTES)) {
677                 if (nvec > 0) {
678                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 1);
679                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
680                     if (rv != APR_SUCCESS) {
681                         (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
682                         return rv;
683                     }
684                 }
685                 rv = sendfile_nonblocking(s, bucket, bytes_written, c);
686                 if (nvec > 0) {
687                     (void)apr_socket_opt_set(s, APR_TCP_NOPUSH, 0);
688                     nvec = 0;
689                 }
690                 if (rv != APR_SUCCESS) {
691                     return rv;
692                 }
693                 break;
694             }
695         }
696 #endif /* APR_HAS_SENDFILE */
697         /* didn't sendfile */
698         if (!APR_BUCKET_IS_METADATA(bucket)) {
699             const char *data;
700             apr_size_t length;
701             
702             /* Non-blocking read first, in case this is a morphing
703              * bucket type. */
704             rv = apr_bucket_read(bucket, &data, &length, APR_NONBLOCK_READ);
705             if (APR_STATUS_IS_EAGAIN(rv)) {
706                 /* Read would block; flush any pending data and retry. */
707                 if (nvec) {
708                     rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
709                     if (rv) {
710                         return rv;
711                     }
712                     nvec = 0;
713                 }
714                 
715                 rv = apr_bucket_read(bucket, &data, &length, APR_BLOCK_READ);
716             }
717             if (rv != APR_SUCCESS) {
718                 return rv;
719             }
720
721             /* reading may have split the bucket, so recompute next: */
722             next = APR_BUCKET_NEXT(bucket);
723             vec[nvec].iov_base = (char *)data;
724             vec[nvec].iov_len = length;
725             nvec++;
726             if (nvec == MAX_IOVEC_TO_WRITE) {
727                 rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
728                 nvec = 0;
729                 if (rv != APR_SUCCESS) {
730                     return rv;
731                 }
732                 break;
733             }
734         }
735     }
736
737     if (nvec > 0) {
738         rv = writev_nonblocking(s, vec, nvec, bb, bytes_written, c);
739         if (rv != APR_SUCCESS) {
740             return rv;
741         }
742     }
743
744     remove_empty_buckets(bb);
745
746     return APR_SUCCESS;
747 }
748
749 static void remove_empty_buckets(apr_bucket_brigade *bb)
750 {
751     apr_bucket *bucket;
752     while (((bucket = APR_BRIGADE_FIRST(bb)) != APR_BRIGADE_SENTINEL(bb)) &&
753            (APR_BUCKET_IS_METADATA(bucket) || (bucket->length == 0))) {
754         apr_bucket_delete(bucket);
755     }
756 }
757
758 static apr_status_t send_brigade_blocking(apr_socket_t *s,
759                                           apr_bucket_brigade *bb,
760                                           apr_size_t *bytes_written,
761                                           conn_rec *c)
762 {
763     apr_status_t rv;
764
765     rv = APR_SUCCESS;
766     while (!APR_BRIGADE_EMPTY(bb)) {
767         rv = send_brigade_nonblocking(s, bb, bytes_written, c);
768         if (rv != APR_SUCCESS) {
769             if (APR_STATUS_IS_EAGAIN(rv)) {
770                 /* Wait until we can send more data */
771                 apr_int32_t nsds;
772                 apr_interval_time_t timeout;
773                 apr_pollfd_t pollset;
774
775                 pollset.p = c->pool;
776                 pollset.desc_type = APR_POLL_SOCKET;
777                 pollset.reqevents = APR_POLLOUT;
778                 pollset.desc.s = s;
779                 apr_socket_timeout_get(s, &timeout);
780                 do {
781                     rv = apr_poll(&pollset, 1, &nsds, timeout);
782                 } while (APR_STATUS_IS_EINTR(rv));
783                 if (rv != APR_SUCCESS) {
784                     break;
785                 }
786             }
787             else {
788                 break;
789             }
790         }
791     }
792     return rv;
793 }
794
795 static apr_status_t writev_nonblocking(apr_socket_t *s,
796                                        struct iovec *vec, apr_size_t nvec,
797                                        apr_bucket_brigade *bb,
798                                        apr_size_t *cumulative_bytes_written,
799                                        conn_rec *c)
800 {
801     apr_status_t rv = APR_SUCCESS, arv;
802     apr_size_t bytes_written = 0, bytes_to_write = 0;
803     apr_size_t i, offset;
804     apr_interval_time_t old_timeout;
805
806     arv = apr_socket_timeout_get(s, &old_timeout);
807     if (arv != APR_SUCCESS) {
808         return arv;
809     }
810     arv = apr_socket_timeout_set(s, 0);
811     if (arv != APR_SUCCESS) {
812         return arv;
813     }
814
815     for (i = 0; i < nvec; i++) {
816         bytes_to_write += vec[i].iov_len;
817     }
818     offset = 0;
819     while (bytes_written < bytes_to_write) {
820         apr_size_t n = 0;
821         rv = apr_socket_sendv(s, vec + offset, nvec - offset, &n);
822         if (n > 0) {
823             bytes_written += n;
824             for (i = offset; i < nvec; ) {
825                 apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
826                 if (APR_BUCKET_IS_METADATA(bucket)) {
827                     apr_bucket_delete(bucket);
828                 }
829                 else if (n >= vec[i].iov_len) {
830                     apr_bucket_delete(bucket);
831                     offset++;
832                     n -= vec[i++].iov_len;
833                 }
834                 else {
835                     apr_bucket_split(bucket, n);
836                     apr_bucket_delete(bucket);
837                     vec[i].iov_len -= n;
838                     vec[i].iov_base = (char *) vec[i].iov_base + n;
839                     break;
840                 }
841             }
842         }
843         if (rv != APR_SUCCESS) {
844             break;
845         }
846     }
847     if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
848         ap__logio_add_bytes_out(c, bytes_written);
849     }
850     *cumulative_bytes_written += bytes_written;
851
852     arv = apr_socket_timeout_set(s, old_timeout);
853     if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
854         return arv;
855     }
856     else {
857         return rv;
858     }
859 }
860
861 #if APR_HAS_SENDFILE
862
863 static apr_status_t sendfile_nonblocking(apr_socket_t *s,
864                                          apr_bucket *bucket,
865                                          apr_size_t *cumulative_bytes_written,
866                                          conn_rec *c)
867 {
868     apr_status_t rv = APR_SUCCESS;
869     apr_bucket_file *file_bucket;
870     apr_file_t *fd;
871     apr_size_t file_length;
872     apr_off_t file_offset;
873     apr_size_t bytes_written = 0;
874
875     if (!APR_BUCKET_IS_FILE(bucket)) {
876         ap_log_error(APLOG_MARK, APLOG_ERR, rv, c->base_server, APLOGNO(00006)
877                      "core_filter: sendfile_nonblocking: "
878                      "this should never happen");
879         return APR_EGENERAL;
880     }
881     file_bucket = (apr_bucket_file *)(bucket->data);
882     fd = file_bucket->fd;
883     file_length = bucket->length;
884     file_offset = bucket->start;
885
886     if (bytes_written < file_length) {
887         apr_size_t n = file_length - bytes_written;
888         apr_status_t arv;
889         apr_interval_time_t old_timeout;
890
891         arv = apr_socket_timeout_get(s, &old_timeout);
892         if (arv != APR_SUCCESS) {
893             return arv;
894         }
895         arv = apr_socket_timeout_set(s, 0);
896         if (arv != APR_SUCCESS) {
897             return arv;
898         }
899         rv = apr_socket_sendfile(s, fd, NULL, &file_offset, &n, 0);
900         if (rv == APR_SUCCESS) {
901             bytes_written += n;
902             file_offset += n;
903         }
904         arv = apr_socket_timeout_set(s, old_timeout);
905         if ((arv != APR_SUCCESS) && (rv == APR_SUCCESS)) {
906             rv = arv;
907         }
908     }
909     if ((ap__logio_add_bytes_out != NULL) && (bytes_written > 0)) {
910         ap__logio_add_bytes_out(c, bytes_written);
911     }
912     *cumulative_bytes_written += bytes_written;
913     if ((bytes_written < file_length) && (bytes_written > 0)) {
914         apr_bucket_split(bucket, bytes_written);
915         apr_bucket_delete(bucket);
916     }
917     else if (bytes_written == file_length) {
918         apr_bucket_delete(bucket);
919     }
920     return rv;
921 }
922
923 #endif