]> granicus.if.org Git - apache/commitdiff
core: Extend support for asynchronous write completion from the
authorGraham Leggett <minfrin@apache.org>
Sun, 4 Oct 2015 10:10:51 +0000 (10:10 +0000)
committerGraham Leggett <minfrin@apache.org>
Sun, 4 Oct 2015 10:10:51 +0000 (10:10 +0000)
network filter to any connection or request filter.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1706669 13f79535-47bb-0310-9956-ffa450edef68

16 files changed:
CHANGES
include/ap_mmn.h
include/http_core.h
include/http_request.h
include/httpd.h
include/util_filter.h
modules/http/http_core.c
modules/http/http_request.c
modules/ssl/ssl_engine_io.c
server/core.c
server/core_filters.c
server/mpm/event/event.c
server/mpm/motorz/motorz.c
server/mpm/simple/simple_io.c
server/request.c
server/util_filter.c

diff --git a/CHANGES b/CHANGES
index 0a4e8be3cd3c5f5788bded2db49cab5fe161b07c..9c055a24767c89e2cc7fb64ecfcd19ebfb058656 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) core: Extend support for asynchronous write completion from the
+     network filter to any connection or request filter. [Graham Leggett]
+
   *) mpm_event: Free memory earlier when shutting down processes.
      [Stefan Fritsch]
 
index 1ca59143a670c0880b8b069ae4eb885d0a9d50b7..631562e4afcc09cdeda2761d350e226812c54fcc 100644 (file)
  *                         protocol_switch and protocol_get. Add
  *                         ap_select_protocol(), ap_switch_protocol(),
  *                         ap_get_protocol(). Add HTTP_MISDIRECTED_REQUEST.
+ * 20150222.5 (2.5.0-dev)  Add ap_request_core_filter(),
+ *                         ap_filter_setaside_brigade(),
+ *                         ap_filter_reinstate_brigade() and
+ *                         ap_filter_should_yield(). Add empty and filters to
+ *                         conn_rec.
  */
 
 #define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */
 #ifndef MODULE_MAGIC_NUMBER_MAJOR
 #define MODULE_MAGIC_NUMBER_MAJOR 20150222
 #endif
-#define MODULE_MAGIC_NUMBER_MINOR 4                 /* 0...n */
+#define MODULE_MAGIC_NUMBER_MINOR 5                 /* 0...n */
 
 /**
  * Determine if the server's current MODULE_MAGIC_NUMBER is at least a
index 13196e1059d36349f033c59b9d6a545315970b43..dbb310db8ed32503e016fb66d8a2c687ad9caba6 100644 (file)
@@ -792,6 +792,7 @@ AP_DECLARE_DATA extern ap_filter_rec_t *ap_subreq_core_filter_handle;
 AP_DECLARE_DATA extern ap_filter_rec_t *ap_core_output_filter_handle;
 AP_DECLARE_DATA extern ap_filter_rec_t *ap_content_length_filter_handle;
 AP_DECLARE_DATA extern ap_filter_rec_t *ap_core_input_filter_handle;
+AP_DECLARE_DATA extern ap_filter_rec_t *ap_request_core_filter_handle;
 
 /**
  * This hook provdes a way for modules to provide metrics/statistics about
index 595042bb25d3c94cdb6f08e05b9d7d24020e0efa..9ec771c9f5301fdbacb09a1621928ea6a1915102 100644 (file)
@@ -149,6 +149,18 @@ AP_DECLARE(int) ap_run_sub_req(request_rec *r);
  */
 AP_DECLARE(void) ap_destroy_sub_req(request_rec *r);
 
+/**
+ * An output filter to ensure that we avoid passing morphing buckets to
+ * connection filters and in so doing defeat async write completion when
+ * they are set aside. This should be inserted at the end of a request
+ * filter stack.
+ * @param f The current filter
+ * @param bb The brigade to filter
+ * @return status code
+ */
+AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
+                                                            apr_bucket_brigade *bb);
+
 /*
  * Then there's the case that you want some other request to be served
  * as the top-level request INSTEAD of what the client requested directly.
index ee5034c167ac7c826d9bea612bb884ed13b56fc8..d36ed10a02aa3ab246a0fbdfa2707421811d8e92 100644 (file)
@@ -55,6 +55,7 @@
 #include "apr_buckets.h"
 #include "apr_poll.h"
 #include "apr_thread_proc.h"
+#include "apr_hash.h"
 
 #include "os.h"
 
@@ -1136,7 +1137,7 @@ struct conn_rec {
     conn_state_t *cs;
     /** Is there data pending in the input filters? */
     int data_in_input_filters;
-    /** Is there data pending in the output filters? */
+    /** No longer used, replaced with ap_filter_should_yield() */
     int data_in_output_filters;
 
     /** Are there any filters that clogg/buffer the input stream, breaking
@@ -1191,6 +1192,12 @@ struct conn_rec {
 
     /** Array of requests being handled under this connection. */
     apr_array_header_t *requests;
+
+    /** Empty bucket brigade */
+    apr_bucket_brigade *empty;
+
+    /** Hashtable of filters with setaside buckets for write completion */
+    apr_hash_t *filters;
 };
 
 struct conn_slave_rec {
index 5a966074ff862537c63a5acc9c8535b463e68370..0e013c68bff1c3e71d61e0a8fd677d2f7aaf6efd 100644 (file)
@@ -278,6 +278,13 @@ struct ap_filter_t {
      *  to the request_rec, except that it is used for connection filters.
      */
     conn_rec *c;
+
+    /** Buffered data associated with the current filter. */
+    apr_bucket_brigade *bb;
+
+    /** Dedicated pool to use for deferred writes. */
+    apr_pool_t *deferred_pool;
+
 };
 
 /**
@@ -519,8 +526,11 @@ AP_DECLARE(apr_status_t) ap_remove_output_filter_byhandle(ap_filter_t *next,
  */
 
 /**
- * prepare a bucket brigade to be setaside.  If a different brigade was
+ * Prepare a bucket brigade to be setaside.  If a different brigade was
  * set-aside earlier, then the two brigades are concatenated together.
+ *
+ * If *save_to is NULL, the brigade will be created, and a cleanup registered
+ * to clear the brigade address when the pool is destroyed.
  * @param f The current filter
  * @param save_to The brigade that was previously set-aside.  Regardless, the
  *             new bucket brigade is returned in this location.
@@ -532,6 +542,53 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
                                          apr_bucket_brigade **save_to,
                                          apr_bucket_brigade **b, apr_pool_t *p);
 
+/**
+ * Prepare a bucket brigade to be setaside, creating a dedicated pool if
+ * necessary within the filter to handle the lifetime of the setaside brigade.
+ * @param f The current filter
+ * @param bb The bucket brigade to set aside.  This brigade is always empty
+ *          on return
+ */
+AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
+                                                    apr_bucket_brigade *bb);
+
+/**
+ * Reinstate a brigade setaside earlier, and calculate the amount of data we
+ * should write based on the presence of flush buckets, size limits on in
+ * memory buckets, and the number of outstanding requests in the pipeline.
+ * This is a safety mechanism to protect against a module that might try
+ * generate data too quickly for downstream to handle without yielding as
+ * it should.
+ *
+ * If the brigade passed in is empty, we reinstate the brigade and return
+ * immediately on the assumption that any buckets needing to be flushed were
+ * flushed before being passed to ap_filter_setaside_brigade().
+ *
+ * @param f The current filter
+ * @param bb The bucket brigade to restore to.
+ * @param flush_upto Work out the bucket we need to flush up to, based on the
+ *                   presence of a flush bucket, size limits on in-memory
+ *                   buckets, size limits on the number of requests outstanding
+ *                   in the pipeline.
+ * @return APR_SUCCESS.
+ */
+AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
+                                                     apr_bucket_brigade *bb,
+                                                     apr_bucket **flush_upto);
+
+/**
+ * This function calculates whether there are any as yet unsent
+ * buffered brigades in downstream filters, and returns non zero
+ * if so.
+ *
+ * A filter should use this to determine whether the passing of data
+ * downstream might block, and so defer the passing of brigades
+ * downstream with ap_filter_setaside_brigade().
+ *
+ * This function can be called safely from a handler.
+ */
+AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f);
+
 /**
  * Flush function for apr_brigade_* calls.  This calls ap_pass_brigade
  * to flush the brigade if the brigade buffer overflows.
index 345de8109ae119860b0fdb8bd393587760fa2a48..f2ca67fc9c4bb41de32552d2d639066746c42f05 100644 (file)
@@ -263,6 +263,8 @@ static int http_create_request(request_rec *r)
                                     NULL, r, r->connection);
         ap_add_output_filter_handle(ap_http_outerror_filter_handle,
                                     NULL, r, r->connection);
+        ap_add_output_filter_handle(ap_request_core_filter_handle,
+                                    NULL, r, r->connection);
     }
 
     return OK;
index 0143a2e73fbc45a1a981833313b5157fdf1d4e8a..28580c67db55b3321604decdca295b8ea5e37975 100644 (file)
@@ -256,6 +256,14 @@ AP_DECLARE(void) ap_process_request_after_handler(request_rec *r)
     apr_bucket *b;
     conn_rec *c = r->connection;
 
+    /* Find the last request, taking into account internal
+     * redirects. We want to send the EOR bucket at the end of
+     * all the buckets so it does not jump the queue.
+     */
+    while (r->next) {
+        r = r->next;
+    }
+
     /* Send an EOR bucket through the output filter chain.  When
      * this bucket is destroyed, the request will be logged and
      * its pool will be freed
@@ -264,8 +272,8 @@ AP_DECLARE(void) ap_process_request_after_handler(request_rec *r)
     b = ap_bucket_eor_create(c->bucket_alloc, r);
     APR_BRIGADE_INSERT_HEAD(bb, b);
 
-    ap_pass_brigade(c->output_filters, bb);
-    
+    ap_pass_brigade(r->output_filters, bb);
+
     /* The EOR bucket has either been handled by an output filter (eg.
      * deleted or moved to a buffered_bb => no more in bb), or an error
      * occured before that (eg. c->aborted => still in bb) and we ought
index 7ad9e1382dbcca6384d2370a64a6cf9d99c86c2f..f230ba56987f0f3797bddb9df7ad06a6398e91ad 100644 (file)
@@ -1682,6 +1682,7 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
     ssl_filter_ctx_t *filter_ctx = f->ctx;
     bio_filter_in_ctx_t *inctx;
     bio_filter_out_ctx_t *outctx;
+    apr_bucket *flush_upto = NULL;
     apr_read_type_e rblock = APR_NONBLOCK_READ;
 
     if (f->c->aborted) {
@@ -1689,6 +1690,9 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
         return APR_ECONNABORTED;
     }
 
+    /* Reinstate any buffered content */
+    ap_filter_reinstate_brigade(f, bb, &flush_upto);
+
     if (!filter_ctx->pssl) {
         /* ssl_filter_io_shutdown was called */
         return ap_pass_brigade(f->next, bb);
@@ -1711,6 +1715,16 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
     while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
         apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
 
+        /* if the core has set aside data, back off and try later */
+        if (!flush_upto) {
+            if (ap_filter_should_yield(f)) {
+                break;
+            }
+        }
+        else if (flush_upto == bucket) {
+            flush_upto = NULL;
+        }
+
         if (APR_BUCKET_IS_METADATA(bucket)) {
             /* Pass through metadata buckets untouched.  EOC is
              * special; terminate the SSL layer first. */
@@ -1762,6 +1776,10 @@ static apr_status_t ssl_io_filter_output(ap_filter_t *f,
 
     }
 
+    if (APR_STATUS_IS_EOF(status) || (status == APR_SUCCESS)) {
+        return ap_filter_setaside_brigade(f, bb);
+    }
+
     return status;
 }
 
index 4d33a00f721e430e23f5d8de7cddae547074ad9b..de3fa23f917b1f27ff3d58ef3c4d552812cd5bf0 100644 (file)
@@ -112,6 +112,7 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, insert_network_bucket,
 
 /* Handles for core filters */
 AP_DECLARE_DATA ap_filter_rec_t *ap_subreq_core_filter_handle;
+AP_DECLARE_DATA ap_filter_rec_t *ap_request_core_filter_handle;
 AP_DECLARE_DATA ap_filter_rec_t *ap_core_output_filter_handle;
 AP_DECLARE_DATA ap_filter_rec_t *ap_content_length_filter_handle;
 AP_DECLARE_DATA ap_filter_rec_t *ap_core_input_filter_handle;
@@ -5007,6 +5008,8 @@ static conn_rec *core_create_conn(apr_pool_t *ptrans, server_rec *s,
 
     c->id = id;
     c->bucket_alloc = alloc;
+    c->empty = apr_brigade_create(c->pool, c->bucket_alloc);
+    c->filters = apr_hash_make(c->pool);
 
     c->clogging_input_filters = 0;
 
@@ -5395,6 +5398,9 @@ static void register_hooks(apr_pool_t *p)
     ap_core_output_filter_handle =
         ap_register_output_filter("CORE", ap_core_output_filter,
                                   NULL, AP_FTYPE_NETWORK);
+    ap_request_core_filter_handle =
+        ap_register_output_filter("REQ_CORE", ap_request_core_filter,
+                                  NULL, AP_FTYPE_TRANSCODE);
     ap_subreq_core_filter_handle =
         ap_register_output_filter("SUBREQ_CORE", ap_sub_req_output_filter,
                                   NULL, AP_FTYPE_CONTENT_SET);
index a6c2bd666b24a631c7a1477a7c02307e7f49e4cf..0f530f868ff645ce3476f56d6557b5607e23eaba 100644 (file)
@@ -78,9 +78,7 @@ do { \
 #define APLOG_MODULE_INDEX AP_CORE_MODULE_INDEX
 
 struct core_output_filter_ctx {
-    apr_bucket_brigade *buffered_bb;
     apr_bucket_brigade *tmp_flush_bb;
-    apr_pool_t *deferred_write_pool;
     apr_size_t bytes_written;
 };
 
@@ -328,11 +326,6 @@ apr_status_t ap_core_input_filter(ap_filter_t *f, apr_bucket_brigade *b,
     return APR_SUCCESS;
 }
 
-static void setaside_remaining_output(ap_filter_t *f,
-                                      core_output_filter_ctx_t *ctx,
-                                      apr_bucket_brigade *bb,
-                                      conn_rec *c);
-
 static apr_status_t send_brigade_nonblocking(apr_socket_t *s,
                                              apr_bucket_brigade *bb,
                                              apr_size_t *bytes_written,
@@ -358,33 +351,23 @@ static apr_status_t sendfile_nonblocking(apr_socket_t *s,
                                          conn_rec *c);
 #endif
 
-/* XXX: Should these be configurable parameters? */
-#define THRESHOLD_MIN_WRITE 4096
-#define THRESHOLD_MAX_BUFFER 65536
-#define MAX_REQUESTS_IN_PIPELINE 5
-
 /* Optional function coming from mod_logio, used for logging of output
  * traffic
  */
 extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *ap__logio_add_bytes_out;
 
-apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
+apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *bb)
 {
     conn_rec *c = f->c;
     core_net_rec *net = f->ctx;
     core_output_filter_ctx_t *ctx = net->out_ctx;
-    apr_bucket_brigade *bb = NULL;
-    apr_bucket *bucket, *next, *flush_upto = NULL;
-    apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
-    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+    apr_bucket *flush_upto = NULL;
     apr_status_t rv;
     int loglevel = ap_get_conn_module_loglevel(c, APLOG_MODULE_INDEX);
 
     /* Fail quickly if the connection has already been aborted. */
     if (c->aborted) {
-        if (new_bb != NULL) {
-            apr_brigade_cleanup(new_bb);
-        }
+        apr_brigade_cleanup(bb);
         return APR_ECONNABORTED;
     }
 
@@ -397,33 +380,14 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
          * allocated from bb->pool which might be wrong.
          */
         ctx->tmp_flush_bb = apr_brigade_create(c->pool, c->bucket_alloc);
-        /* same for buffered_bb and ap_save_brigade */
-        ctx->buffered_bb = apr_brigade_create(c->pool, c->bucket_alloc);
-    }
-
-    if (new_bb != NULL)
-        bb = new_bb;
-
-    if ((ctx->buffered_bb != NULL) &&
-        !APR_BRIGADE_EMPTY(ctx->buffered_bb)) {
-        if (new_bb != NULL) {
-            APR_BRIGADE_PREPEND(bb, ctx->buffered_bb);
-        }
-        else {
-            bb = ctx->buffered_bb;
-        }
-        c->data_in_output_filters = 0;
-    }
-    else if (new_bb == NULL) {
-        return APR_SUCCESS;
     }
 
     /* Scan through the brigade and decide whether to attempt a write,
      * and how much to write, based on the following rules:
      *
-     *  1) The new_bb is null: Do a nonblocking write of as much as
+     *  1) The bb is empty: Do a nonblocking write of as much as
      *     possible: do a nonblocking write of as much data as possible,
-     *     then save the rest in ctx->buffered_bb.  (If new_bb == NULL,
+     *     then save the rest in ctx->buffered_bb.  (If bb is empty,
      *     it probably means that the MPM is doing asynchronous write
      *     completion and has just determined that this connection
      *     is writable.)
@@ -459,91 +423,12 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
      *  3) Actually do the blocking write up to the last bucket determined
      *     by rules 2a-d. The point of doing only one flush is to make as
      *     few calls to writev() as possible.
-     *
-     *  4) If the brigade contains at least THRESHOLD_MIN_WRITE
-     *     bytes: Do a nonblocking write of as much data as possible,
-     *     then save the rest in ctx->buffered_bb.
      */
 
-    if (new_bb == NULL) {
-        rv = send_brigade_nonblocking(net->client_socket, bb,
-                                      &(ctx->bytes_written), c);
-        if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) {
-            /* The client has aborted the connection */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
-                          "core_output_filter: writing data to the network");
-            apr_brigade_cleanup(bb);
-            c->aborted = 1;
-            return rv;
-        }
-        setaside_remaining_output(f, ctx, bb, c);
-        return APR_SUCCESS;
-    }
-
-    bytes_in_brigade = 0;
-    non_file_bytes_in_brigade = 0;
-    eor_buckets_in_brigade = 0;
-    morphing_bucket_in_brigade = 0;
-
-    for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
-         bucket = next) {
-        next = APR_BUCKET_NEXT(bucket);
-
-        if (!APR_BUCKET_IS_METADATA(bucket)) {
-            if (bucket->length == (apr_size_t)-1) {
-                /*
-                 * A setaside of morphing buckets would read everything into
-                 * memory. Instead, we will flush everything up to and
-                 * including this bucket.
-                 */
-                morphing_bucket_in_brigade = 1;
-            }
-            else {
-                bytes_in_brigade += bucket->length;
-                if (!APR_BUCKET_IS_FILE(bucket))
-                    non_file_bytes_in_brigade += bucket->length;
-            }
-        }
-        else if (AP_BUCKET_IS_EOR(bucket)) {
-            eor_buckets_in_brigade++;
-        }
-
-        if (APR_BUCKET_IS_FLUSH(bucket)
-            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
-            || morphing_bucket_in_brigade
-            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
-            /* this segment of the brigade MUST be sent before returning. */
-
-            if (loglevel >= APLOG_TRACE6) {
-                char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
-                               "FLUSH bucket" :
-                               (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
-                               "THRESHOLD_MAX_BUFFER" :
-                               morphing_bucket_in_brigade ? "morphing bucket" :
-                               "MAX_REQUESTS_IN_PIPELINE";
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, c,
-                              "will flush because of %s", reason);
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
-                              "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
-                              ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
-                              "buckets: %d, morphing buckets: %d",
-                              flush_upto == NULL ? " so far"
-                                                 : " since last flush point",
-                              bytes_in_brigade,
-                              non_file_bytes_in_brigade,
-                              eor_buckets_in_brigade,
-                              morphing_bucket_in_brigade);
-            }
-            /*
-             * Defer the actual blocking write to avoid doing many writes.
-             */
-            flush_upto = next;
+    ap_filter_reinstate_brigade(f, bb, &flush_upto);
 
-            bytes_in_brigade = 0;
-            non_file_bytes_in_brigade = 0;
-            eor_buckets_in_brigade = 0;
-            morphing_bucket_in_brigade = 0;
-        }
+    if (APR_BRIGADE_EMPTY(bb)) {
+        return APR_SUCCESS;
     }
 
     if (flush_upto != NULL) {
@@ -571,71 +456,30 @@ apr_status_t ap_core_output_filter(ap_filter_t *f, apr_bucket_brigade *new_bb)
         APR_BRIGADE_CONCAT(bb, ctx->tmp_flush_bb);
     }
 
+    rv = send_brigade_nonblocking(net->client_socket, bb, &(ctx->bytes_written),
+            c);
+    if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
+        /* The client has aborted the connection */
+        ap_log_cerror(
+                APLOG_MARK, APLOG_TRACE1, rv, c,
+                "core_output_filter: writing data to the network");
+        apr_brigade_cleanup(bb);
+        c->aborted = 1;
+        return rv;
+    }
     if (loglevel >= APLOG_TRACE8) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
-                      "brigade contains: bytes: %" APR_SIZE_T_FMT
-                      ", non-file bytes: %" APR_SIZE_T_FMT
-                      ", eor buckets: %d, morphing buckets: %d",
-                      bytes_in_brigade, non_file_bytes_in_brigade,
-                      eor_buckets_in_brigade, morphing_bucket_in_brigade);
+        ap_log_cerror(
+                APLOG_MARK, APLOG_TRACE8, 0, c,
+                "tried nonblocking write, total bytes "
+                "written: %" APR_SIZE_T_FMT, ctx->bytes_written);
     }
 
-    if (bytes_in_brigade >= THRESHOLD_MIN_WRITE) {
-        rv = send_brigade_nonblocking(net->client_socket, bb,
-                                      &(ctx->bytes_written), c);
-        if ((rv != APR_SUCCESS) && (!APR_STATUS_IS_EAGAIN(rv))) {
-            /* The client has aborted the connection */
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c,
-                          "core_output_filter: writing data to the network");
-            apr_brigade_cleanup(bb);
-            c->aborted = 1;
-            return rv;
-        }
-        if (loglevel >= APLOG_TRACE8) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, c,
-                              "tried nonblocking write, total bytes "
-                              "written: %" APR_SIZE_T_FMT,
-                              ctx->bytes_written);
-        }
-    }
+    remove_empty_buckets(bb);
+    ap_filter_setaside_brigade(f, bb);
 
-    setaside_remaining_output(f, ctx, bb, c);
     return APR_SUCCESS;
 }
 
-/*
- * This function assumes that either ctx->buffered_bb == NULL, or
- * ctx->buffered_bb is empty, or ctx->buffered_bb == bb
- */
-static void setaside_remaining_output(ap_filter_t *f,
-                                      core_output_filter_ctx_t *ctx,
-                                      apr_bucket_brigade *bb,
-                                      conn_rec *c)
-{
-    if (bb == NULL) {
-        return;
-    }
-    remove_empty_buckets(bb);
-    if (!APR_BRIGADE_EMPTY(bb)) {
-        c->data_in_output_filters = 1;
-        if (bb != ctx->buffered_bb) {
-            if (!ctx->deferred_write_pool) {
-                apr_pool_create(&ctx->deferred_write_pool, c->pool);
-                apr_pool_tag(ctx->deferred_write_pool, "deferred_write");
-            }
-            ap_save_brigade(f, &(ctx->buffered_bb), &bb,
-                            ctx->deferred_write_pool);
-        }
-    }
-    else if (ctx->deferred_write_pool) {
-        /*
-         * There are no more requests in the pipeline. We can just clear the
-         * pool.
-         */
-        apr_pool_clear(ctx->deferred_write_pool);
-    }
-}
-
 #ifndef APR_MAX_IOVEC_SIZE
 #define MAX_IOVEC_TO_WRITE 16
 #else
index ee0d8fe6909f7a1090cf04f93cf78e5998dd212e..1cdc52c762c6f3a9d39bcfd21ce4e3f31e7cf6cc 100644 (file)
@@ -1146,19 +1146,38 @@ read_request:
     }
 
     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
-        ap_filter_t *output_filter = c->output_filters;
-        apr_status_t rv;
+        apr_hash_index_t *rindex;
+        apr_status_t rv = APR_SUCCESS;
+        int data_in_output_filters = 0;
         ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
-        while (output_filter->next != NULL) {
-            output_filter = output_filter->next;
+
+        rindex = apr_hash_first(NULL, c->filters);
+        while (rindex) {
+            ap_filter_t *f = apr_hash_this_val(rindex);
+
+            if (!APR_BRIGADE_EMPTY(f->bb)) {
+
+                rv = ap_pass_brigade(f, c->empty);
+                apr_brigade_cleanup(c->empty);
+                if (APR_SUCCESS != rv) {
+                    ap_log_cerror(
+                            APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
+                            "write failure in '%s' output filter", f->frec->name);
+                    break;
+                }
+
+                if (ap_filter_should_yield(f)) {
+                    data_in_output_filters = 1;
+                }
+            }
+
+            rindex = apr_hash_next(rindex);
         }
-        rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
+
         if (rv != APR_SUCCESS) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
-                          "network write failure in core output filter");
             cs->pub.state = CONN_STATE_LINGER;
         }
-        else if (c->data_in_output_filters) {
+        else if (data_in_output_filters) {
             /* Still in WRITE_COMPLETION_STATE:
              * Set a write timeout for this connection, and let the
              * event thread poll for writeability.
index f359dbafb9fa8ba3ff028861f47826742b0847f6..a10ead0f59cc9e10064934651012dc4a200a95b9 100644 (file)
@@ -359,21 +359,38 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon)
         }
 
         if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
-            ap_filter_t *output_filter = c->output_filters;
-            ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c);
-            while (output_filter->next != NULL) {
-                output_filter = output_filter->next;
-            }
+            apr_hash_index_t *rindex;
+            apr_status_t rv = APR_SUCCESS;
+            int data_in_output_filters = 0;
+            ap_update_child_status_from_conn(sbh, SERVER_BUSY_WRITE, c);
+
+            rindex = apr_hash_first(NULL, c->filters);
+            while (rindex) {
+                ap_filter_t *f = apr_hash_this_val(rindex);
+
+                if (!APR_BRIGADE_EMPTY(f->bb)) {
+
+                    rv = ap_pass_brigade(f, c->empty);
+                    apr_brigade_cleanup(c->empty);
+                    if (APR_SUCCESS != rv) {
+                        ap_log_cerror(
+                                APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(02848)
+                                "write failure in '%s' output filter", f->frec->name);
+                        break;
+                    }
+
+                    if (ap_filter_should_yield(f)) {
+                        data_in_output_filters = 1;
+                    }
+                }
 
-            rv = output_filter->frec->filter_func.out_func(output_filter,
-                                                           NULL);
+                rindex = apr_hash_next(rindex);
+            }
 
             if (rv != APR_SUCCESS) {
-                ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(02848)
-                             "network write failure in core output filter");
                 scon->cs.state = CONN_STATE_LINGER;
             }
-            else if (c->data_in_output_filters) {
+            else if (data_in_output_filters) {
                 /* Still in WRITE_COMPLETION_STATE:
                  * Set a write timeout for this connection, and let the
                  * event thread poll for writeability.
index b14aae474ff23cbbafd8d59ce7fc1b6c77bf9379..47c13d71c6ebf8cdd30f98ceedd8711d69b7587a 100644 (file)
@@ -92,20 +92,37 @@ static apr_status_t simple_io_process(simple_conn_t * scon)
         }
 
         if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
-            ap_filter_t *output_filter = c->output_filters;
-            while (output_filter->next != NULL) {
-                output_filter = output_filter->next;
-            }
+            apr_hash_index_t *rindex;
+            apr_status_t rv = APR_SUCCESS;
+            int data_in_output_filters = 0;
+
+            rindex = apr_hash_first(NULL, c->filters);
+            while (rindex) {
+                ap_filter_t *f = apr_hash_this_val(rindex);
+
+                if (!APR_BRIGADE_EMPTY(f->bb)) {
+
+                    rv = ap_pass_brigade(f, c->empty);
+                    apr_brigade_cleanup(c->empty);
+                    if (APR_SUCCESS != rv) {
+                        ap_log_cerror(
+                                APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00249)
+                                "write failure in '%s' output filter", f->frec->name);
+                        break;
+                    }
+
+                    if (ap_filter_should_yield(f)) {
+                        data_in_output_filters = 1;
+                    }
+                }
 
-            rv = output_filter->frec->filter_func.out_func(output_filter,
-                                                           NULL);
+                rindex = apr_hash_next(rindex);
+            }
 
             if (rv != APR_SUCCESS) {
-                ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO(00249)
-                             "network write failure in core output filter");
                 scon->cs.state = CONN_STATE_LINGER;
             }
-            else if (c->data_in_output_filters) {
+            else if (data_in_output_filters) {
                 /* Still in WRITE_COMPLETION_STATE:
                  * Set a write timeout for this connection, and let the
                  * event thread poll for writeability.
index 67e535d14c54d376c11f68b314ab92d22a673805..9c9ad9f93b8ed7c36034a9d52b634faf34c5eeb0 100644 (file)
@@ -2036,6 +2036,64 @@ AP_CORE_DECLARE_NONSTD(apr_status_t) ap_sub_req_output_filter(ap_filter_t *f,
     return APR_SUCCESS;
 }
 
+AP_CORE_DECLARE_NONSTD(apr_status_t) ap_request_core_filter(ap_filter_t *f,
+                                                            apr_bucket_brigade *bb)
+{
+    apr_bucket *flush_upto = NULL;
+    apr_status_t status = APR_SUCCESS;
+    apr_bucket_brigade *tmp_bb = f->ctx;
+
+    if (!tmp_bb) {
+        tmp_bb = f->ctx = apr_brigade_create(f->r->pool, f->c->bucket_alloc);
+    }
+
+    /* Reinstate any buffered content */
+    ap_filter_reinstate_brigade(f, bb, &flush_upto);
+
+    while (!APR_BRIGADE_EMPTY(bb)) {
+        apr_bucket *bucket = APR_BRIGADE_FIRST(bb);
+
+        /* if the core has set aside data, back off and try later */
+        if (!flush_upto) {
+            if (ap_filter_should_yield(f)) {
+                break;
+            }
+        }
+        else if (flush_upto == bucket) {
+            flush_upto = NULL;
+        }
+
+        /* have we found a morphing bucket? if so, force it to morph into something
+         * safe to pass down to the connection filters without needing to be set
+         * aside.
+         */
+        if (!APR_BUCKET_IS_METADATA(bucket)) {
+            if (bucket->length == (apr_size_t) - 1) {
+                const char *data;
+                apr_size_t size;
+                if (APR_SUCCESS
+                        != (status = apr_bucket_read(bucket, &data, &size,
+                                APR_BLOCK_READ))) {
+                    return status;
+                }
+            }
+        }
+
+        /* pass each bucket down the chain */
+        APR_BUCKET_REMOVE(bucket);
+        APR_BRIGADE_INSERT_TAIL(tmp_bb, bucket);
+
+        status = ap_pass_brigade(f->next, tmp_bb);
+        if (!APR_STATUS_IS_EOF(status) && (status != APR_SUCCESS)) {
+            return status;
+        }
+
+    }
+
+    ap_filter_setaside_brigade(f, bb);
+    return status;
+}
+
 extern APR_OPTIONAL_FN_TYPE(authz_some_auth_required) *ap__authz_ap_some_auth_required;
 
 AP_DECLARE(int) ap_some_auth_required(request_rec *r)
index 01eb533520470eb65962c5c7b0923b0da9996f13..ad14112ddba3dfa984d97be8e20c15e34829f52e 100644 (file)
@@ -24,6 +24,7 @@
 #include "http_config.h"
 #include "http_core.h"
 #include "http_log.h"
+#include "http_request.h"
 #include "util_filter.h"
 
 /* NOTE: Apache's current design doesn't allow a pool to be passed thru,
 #define FILTER_POOL     apr_hook_global_pool
 #include "ap_hooks.h"   /* for apr_hook_global_pool */
 
+/* XXX: Should these be configurable parameters? */
+#define THRESHOLD_MAX_BUFFER 65536
+#define MAX_REQUESTS_IN_PIPELINE 5
+
 /*
 ** This macro returns true/false if a given filter should be inserted BEFORE
 ** another filter. This will happen when one of: 1) there isn't another
@@ -319,6 +324,8 @@ static ap_filter_t *add_any_filter_handle(ap_filter_rec_t *frec, void *ctx,
     f->r = frec->ftype < AP_FTYPE_CONNECTION ? r : NULL;
     f->c = c;
     f->next = NULL;
+    f->bb = NULL;
+    f->deferred_pool = NULL;
 
     if (INSERT_BEFORE(f, *outf)) {
         f->next = *outf;
@@ -474,6 +481,16 @@ AP_DECLARE(void) ap_remove_input_filter(ap_filter_t *f)
 
 AP_DECLARE(void) ap_remove_output_filter(ap_filter_t *f)
 {
+
+    if ((f->bb) && !APR_BRIGADE_EMPTY(f->bb)) {
+        apr_brigade_cleanup(f->bb);
+    }
+
+    if (f->deferred_pool) {
+        apr_pool_destroy(f->deferred_pool);
+        f->deferred_pool = NULL;
+    }
+
     remove_any_filter(f, f->r ? &f->r->output_filters : NULL,
                       f->r ? &f->r->proto_output_filters : NULL,
                       &f->c->output_filters);
@@ -566,6 +583,7 @@ AP_DECLARE(apr_status_t) ap_pass_brigade(ap_filter_t *next,
 {
     if (next) {
         apr_bucket *e;
+
         if ((e = APR_BRIGADE_LAST(bb)) && APR_BUCKET_IS_EOS(e) && next->r) {
             /* This is only safe because HTTP_HEADER filter is always in
              * the filter stack.   This ensures that there is ALWAYS a
@@ -635,7 +653,8 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
     apr_status_t rv, srv = APR_SUCCESS;
 
     /* If have never stored any data in the filter, then we had better
-     * create an empty bucket brigade so that we can concat.
+     * create an empty bucket brigade so that we can concat. Register
+     * a cleanup to zero out the pointer if the pool is cleared.
      */
     if (!(*saveto)) {
         *saveto = apr_brigade_create(p, f->c->bucket_alloc);
@@ -673,6 +692,248 @@ AP_DECLARE(apr_status_t) ap_save_brigade(ap_filter_t *f,
     return srv;
 }
 
+static apr_status_t filters_cleanup(void *data)
+{
+    ap_filter_t **key = data;
+
+    apr_hash_set((*key)->c->filters, key, sizeof(ap_filter_t **), NULL);
+
+    return APR_SUCCESS;
+}
+
+AP_DECLARE(apr_status_t) ap_filter_setaside_brigade(ap_filter_t *f,
+        apr_bucket_brigade *bb)
+{
+    int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
+
+    if (loglevel >= APLOG_TRACE6) {
+        ap_log_cerror(
+            APLOG_MARK, APLOG_TRACE6, 0, f->c,
+            "setaside %s brigade to %s brigade in '%s' output filter",
+            (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"),
+            (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"), f->frec->name);
+    }
+
+    if (!APR_BRIGADE_EMPTY(bb)) {
+        apr_pool_t *pool;
+        /*
+         * Set aside the brigade bb within f->bb.
+         */
+        if (!f->bb) {
+            ap_filter_t **key;
+
+            pool = f->r ? f->r->pool : f->c->pool;
+
+            key = apr_palloc(pool, sizeof(ap_filter_t **));
+            *key = f;
+            apr_hash_set(f->c->filters, key, sizeof(ap_filter_t **), f);
+
+            f->bb = apr_brigade_create(pool, f->c->bucket_alloc);
+
+            apr_pool_pre_cleanup_register(pool, key, filters_cleanup);
+
+        }
+
+        /* decide what pool we setaside to, request pool or deferred pool? */
+        if (f->r) {
+            pool = f->r->pool;
+            APR_BRIGADE_CONCAT(f->bb, bb);
+        }
+        else {
+            if (!f->deferred_pool) {
+                apr_pool_create(&f->deferred_pool, f->c->pool);
+                apr_pool_tag(f->deferred_pool, "deferred_pool");
+            }
+            pool = f->deferred_pool;
+            return ap_save_brigade(f, &f->bb, &bb, pool);
+        }
+
+    }
+    else if (f->deferred_pool) {
+        /*
+         * There are no more requests in the pipeline. We can just clear the
+         * pool.
+         */
+        apr_brigade_cleanup(f->bb);
+        apr_pool_clear(f->deferred_pool);
+    }
+    return APR_SUCCESS;
+}
+
+AP_DECLARE(apr_status_t) ap_filter_reinstate_brigade(ap_filter_t *f,
+                                                     apr_bucket_brigade *bb,
+                                                     apr_bucket **flush_upto)
+{
+    apr_bucket *bucket, *next;
+    apr_size_t bytes_in_brigade, non_file_bytes_in_brigade;
+    int eor_buckets_in_brigade, morphing_bucket_in_brigade;
+    int loglevel = ap_get_conn_module_loglevel(f->c, APLOG_MODULE_INDEX);
+
+    if (loglevel >= APLOG_TRACE6) {
+        ap_log_cerror(
+            APLOG_MARK, APLOG_TRACE6, 0, f->c,
+            "reinstate %s brigade to %s brigade in '%s' output filter",
+            (!f->bb || APR_BRIGADE_EMPTY(f->bb) ? "empty" : "full"),
+            (APR_BRIGADE_EMPTY(bb) ? "empty" : "full"), f->frec->name);
+    }
+
+    if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
+        APR_BRIGADE_PREPEND(bb, f->bb);
+    }
+
+    /*
+     * Determine if and up to which bucket we need to do a blocking write:
+     *
+     *  a) The brigade contains a flush bucket: Do a blocking write
+     *     of everything up that point.
+     *
+     *  b) The request is in CONN_STATE_HANDLER state, and the brigade
+     *     contains at least THRESHOLD_MAX_BUFFER bytes in non-file
+     *     buckets: Do blocking writes until the amount of data in the
+     *     buffer is less than THRESHOLD_MAX_BUFFER.  (The point of this
+     *     rule is to provide flow control, in case a handler is
+     *     streaming out lots of data faster than the data can be
+     *     sent to the client.)
+     *
+     *  c) The request is in CONN_STATE_HANDLER state, and the brigade
+     *     contains at least MAX_REQUESTS_IN_PIPELINE EOR buckets:
+     *     Do blocking writes until less than MAX_REQUESTS_IN_PIPELINE EOR
+     *     buckets are left. (The point of this rule is to prevent too many
+     *     FDs being kept open by pipelined requests, possibly allowing a
+     *     DoS).
+     *
+     *  d) The request is being served by a connection filter and the
+     *     brigade contains a morphing bucket: If there was no other
+     *     reason to do a blocking write yet, try reading the bucket. If its
+     *     contents fit into memory before THRESHOLD_MAX_BUFFER is reached,
+     *     everything is fine. Otherwise we need to do a blocking write the
+     *     up to and including the morphing bucket, because ap_save_brigade()
+     *     would read the whole bucket into memory later on.
+     */
+
+    *flush_upto = NULL;
+
+    bytes_in_brigade = 0;
+    non_file_bytes_in_brigade = 0;
+    eor_buckets_in_brigade = 0;
+    morphing_bucket_in_brigade = 0;
+
+    for (bucket = APR_BRIGADE_FIRST(bb); bucket != APR_BRIGADE_SENTINEL(bb);
+         bucket = next) {
+        next = APR_BUCKET_NEXT(bucket);
+
+        if (!APR_BUCKET_IS_METADATA(bucket)) {
+            if (bucket->length == (apr_size_t)-1) {
+                /*
+                 * A setaside of morphing buckets would read everything into
+                 * memory. Instead, we will flush everything up to and
+                 * including this bucket.
+                 */
+                morphing_bucket_in_brigade = 1;
+            }
+            else {
+                bytes_in_brigade += bucket->length;
+                if (!APR_BUCKET_IS_FILE(bucket))
+                    non_file_bytes_in_brigade += bucket->length;
+            }
+        }
+        else if (AP_BUCKET_IS_EOR(bucket)) {
+            eor_buckets_in_brigade++;
+        }
+
+        if (APR_BUCKET_IS_FLUSH(bucket)
+            || non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER
+            || (!f->r && morphing_bucket_in_brigade)
+            || eor_buckets_in_brigade > MAX_REQUESTS_IN_PIPELINE) {
+            /* this segment of the brigade MUST be sent before returning. */
+
+            if (loglevel >= APLOG_TRACE6) {
+                char *reason = APR_BUCKET_IS_FLUSH(bucket) ?
+                               "FLUSH bucket" :
+                               (non_file_bytes_in_brigade >= THRESHOLD_MAX_BUFFER) ?
+                               "THRESHOLD_MAX_BUFFER" :
+                               (!f->r && morphing_bucket_in_brigade) ? "morphing bucket" :
+                               "MAX_REQUESTS_IN_PIPELINE";
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, f->c,
+                              "will flush because of %s", reason);
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
+                              "seen in brigade%s: bytes: %" APR_SIZE_T_FMT
+                              ", non-file bytes: %" APR_SIZE_T_FMT ", eor "
+                              "buckets: %d, morphing buckets: %d",
+                              flush_upto == NULL ? " so far"
+                                                 : " since last flush point",
+                              bytes_in_brigade,
+                              non_file_bytes_in_brigade,
+                              eor_buckets_in_brigade,
+                              morphing_bucket_in_brigade);
+            }
+            /*
+             * Defer the actual blocking write to avoid doing many writes.
+             */
+            *flush_upto = next;
+
+            bytes_in_brigade = 0;
+            non_file_bytes_in_brigade = 0;
+            eor_buckets_in_brigade = 0;
+            morphing_bucket_in_brigade = 0;
+        }
+    }
+
+    if (loglevel >= APLOG_TRACE8) {
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, f->c,
+                      "brigade contains: bytes: %" APR_SIZE_T_FMT
+                      ", non-file bytes: %" APR_SIZE_T_FMT
+                      ", eor buckets: %d, morphing buckets: %d",
+                      bytes_in_brigade, non_file_bytes_in_brigade,
+                      eor_buckets_in_brigade, morphing_bucket_in_brigade);
+    }
+
+    return APR_SUCCESS;
+}
+
+AP_DECLARE(int) ap_filter_should_yield(ap_filter_t *f)
+{
+    /*
+     * This function decides whether a filter should yield due to buffered
+     * data in a downstream filter. If a downstream filter buffers we
+     * must back off so we don't overwhelm the server. If this function
+     * returns true, the filter should call ap_filter_setaside_brigade()
+     * to save unprocessed buckets, and then reinstate those buckets on
+     * the next call with ap_filter_reinstate_brigade() and continue
+     * where it left off.
+     *
+     * If this function is forced to return zero, we return back to
+     * synchronous filter behaviour.
+     *
+     * Subrequests present us with a problem - we don't know how much data
+     * they will produce and therefore how much buffering we'll need, and
+     * if a subrequest had to trigger buffering, but next subrequest wouldn't
+     * know when the previous one had finished sending data and buckets
+     * could be sent out of order.
+     *
+     * In the case of subrequests, deny the ability to yield. When the data
+     * reaches the filters from the main request, they will be setaside
+     * there in the right order and the request will be given the
+     * opportunity to yield.
+     */
+    if (f->r && f->r->main) {
+        return 0;
+    }
+
+    /*
+     * This is either a main request or internal redirect, or it is a
+     * connection filter. Yield if there is any buffered data downstream
+     * from us.
+     */
+    while (f) {
+        if (f->bb && !APR_BRIGADE_EMPTY(f->bb)) {
+            return 1;
+        }
+        f = f->next;
+    }
+    return 0;
+}
+
 AP_DECLARE_NONSTD(apr_status_t) ap_filter_flush(apr_bucket_brigade *bb,
                                                 void *ctx)
 {