]> granicus.if.org Git - apache/commitdiff
Introduce Suspendable Requests to the Event MPM.
authorPaul Querna <pquerna@apache.org>
Sat, 20 Sep 2008 11:58:08 +0000 (11:58 +0000)
committerPaul Querna <pquerna@apache.org>
Sat, 20 Sep 2008 11:58:08 +0000 (11:58 +0000)
Using this basic framework, you can return SUSPENDED from an HTTP Handler,
and then register a callback that is invoked by the MPM at a later time.

This initial version only supports _timers_ as callbacks, but in the future I
would like to add things like wait for socket activity, on a socket specified by
the handler.

Once in a callback, It is then the responsibility of the callback fucntion
to finish the HTTP Request handling, but this alows you to do cool things like
a fully async proxy, COMET support, or even rate limiting.

To prove I'm not insane, I've inlcuded an example module, mod_dialup.

You can configure it like this:
<Location "/docs">
  ModemStandard "V.32"
</Location>

And for static files inside that path, you will be rate limited to V.32 speeds,
aka 9.6 kilobits/second.

Does anyone besides Rüdiger read commit emails :-) ?

I know there are likely huge problems with this, but I would like to see how far
we can push the Event MPM, figure out what to do better, if there is anything,
and then really dive into the 3.0 development before ApacheCon.

* server/mpm/experimental/event/fdqueue.h:
    (timer_event_t): New structure to hold timer events and callback functions.

* server/mpm/experimental/event/fdqueue.c
    (ap_queue_empty): Modify to also look at Timer Ring.

    (ap_queue_init): Initialize Timer Ring.

    (ap_queue_push_timer): New function, pushes a timer event into the queue.

    (ap_queue_pop_something): Renamed function, returns a timer event or
        a socket/pool for a worker thread to run.

* server/mpm/experimental/event/event.c
    (process_socket): If the connection is in SUSPENDED state, don't force it
        into linger mode yet, the callback will have to take care of that.

    (push_timer2worker): New shortcut function, pushes timer event into queue
        for a worker to run.

    (timer_free_ring): New global data structure to recycle memory used by
        timer events.

    (timer_ring): New global data structure to hold active timer events.

    (g_timer_ring_mtx): Thread mutex to protect timer event data structures.

    (ap_mpm_register_timed_callback): New Function, registers a callback to be
        invoked by the MPM at a later time.

    (listener_thread): Calculate our wakeup time based on the upcoming Event
        Queue, and after pollset_poll runs, push any Timers that have passed
        onto worker threads to run.

    (worker_thread): Call new queue pop method, and if the Timer Event is
        non-null, invoke the callback.  Once the callback is done, push the
        structure onto the timer_free_ring, to be recycled.

    (child_main): Initialize new mutex and ring structures.

* server/config.c
    (ap_invoke_handler): Allow SUSPENDED aa valid return code from handlers.

* modules/http/http_core.c
    (ap_process_http_async_connection): Don't close the connection when in
        SUSPENDED state.

* modules/http/http_request.c
    (ap_process_request_after_handler): New function, body pulled from the old,
        ap_process_async_request.  Split to let handlers invoke this so they
        don't need to know all of the details of finishing a request.

    (ap_process_async_request): If the handler returns SUSPENDED, don't do
        anything but return.

* include/ap_mmn.h: Bump MMN.

* include/ap_mpm.h
    (ap_mpm_register_timed_callback): New function.

* include/httpd.h:
    (SUSPENDED): New return code for handlers.
    (request_rec::invoke_mtx): New mutex to protect callback invokcations
        from being run before the original handler finishes running.
    (conn_state_e): Add a suspended state.

* include/http_request.h
    (ap_process_request_after_handler): New function to make it easier for
        handlers to finish the HTTP Request.

* modules/test/config.m4: Add mod_dialup to build.

* modules/test/mod_dialup.c: New rate limiting module, requires the Event MPM
    to work.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@697357 13f79535-47bb-0310-9956-ffa450edef68

12 files changed:
include/ap_mmn.h
include/ap_mpm.h
include/http_request.h
include/httpd.h
modules/http/http_core.c
modules/http/http_request.c
modules/test/config.m4
modules/test/mod_dialup.c [new file with mode: 0644]
server/config.c
server/mpm/experimental/event/event.c
server/mpm/experimental/event/fdqueue.c
server/mpm/experimental/event/fdqueue.h

index 713713df85cac8bc6f296dee9ca0321f9598bf8f..c085299fa957a75e4d52655064c3149c7eb7e27f 100644 (file)
  * 20080722.2 (2.3.0-dev)  Add scolonsep to proxy_balancer
  * 20080829.0 (2.3.0-dev)  Add cookie attributes when removing cookies
  * 20080830.0 (2.3.0-dev)  Cookies can be set on headers_out and err_headers_out
+ * 20080920.0 (2.3.0-dev)  Add ap_mpm_register_timed_callback. 
  *
  */
 
 #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */
 
 #ifndef MODULE_MAGIC_NUMBER_MAJOR
-#define MODULE_MAGIC_NUMBER_MAJOR 20080830
+#define MODULE_MAGIC_NUMBER_MAJOR 20080920
 #endif
 #define MODULE_MAGIC_NUMBER_MINOR 0                     /* 0...n */
 
index eb3ef516635d03586b066380f9bb5a83a4e4ac25..787ceeca9676e934b19e3510e76c6d696b3fb216 100644 (file)
@@ -152,6 +152,14 @@ AP_DECLARE(apr_status_t) ap_os_create_privileged_process(
  */
 AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result);
 
+
+typedef void (ap_mpm_callback_fn_t)(void *baton);
+
+/* XXXXXXX: only added support in the Event MPM.... */
+AP_DECLARE(void) ap_mpm_register_timed_callback(apr_time_t t,
+                                                ap_mpm_callback_fn_t *cbfn,
+                                                void *baton);
+    
 /* Defining GPROF when compiling uses the moncontrol() function to
  * disable gprof profiling in the parent, and enable it only for
  * request processing in children (or in one_process mode).  It's
index cda1b955d54a60e63585cb79ef7922f70cfff1e1..de87d99742c4065b4b4020594eedde925690c5f0 100644 (file)
@@ -315,7 +315,10 @@ AP_DECLARE(void) ap_allow_standard_methods(request_rec *r, int reset, ...);
  */
 void ap_process_request(request_rec *);
 
-/**
+/* For post-processing after a handler has finished with a request. (Commonly used after it was suspended) */
+void ap_process_request_after_handler(request_rec *r);
+
+    /**
  * Process a top-level request from a client, allowing some or all of
  * the response to remain buffered in the core output filter for later,
  * asynchronous write completion
index 279713e3361c2a74925ec0110f2d3cabc63250fe..2be3bf7361e3a41e1d5daf4112466c25bf1a3b35 100644 (file)
@@ -457,6 +457,8 @@ AP_DECLARE(const char *) ap_get_server_built(void);
 #define DONE -2                        /**< Module has served the response completely 
                                 *  - it's safe to die() with no more output
                                 */
+#define SUSPENDED -3 /**< Module will handle the remainder of the request. 
+                      * The core will never invoke the request again, */
 #define OK 0                   /**< Module has handled this stage. */
 
 
@@ -989,6 +991,8 @@ struct request_rec {
     /** The optional kept body of the request. */
     apr_bucket_brigade *kept_body;
 
+    apr_thread_mutex_t *invoke_mtx;
+
 /* Things placed at the end of the record to avoid breaking binary
  * compatibility.  It would be nice to remember to reorder the entire
  * record to improve 64bit alignment the next time we need to break
@@ -1105,6 +1109,7 @@ typedef enum  {
     CONN_STATE_READ_REQUEST_LINE,
     CONN_STATE_HANDLER,
     CONN_STATE_WRITE_COMPLETION,
+    CONN_STATE_SUSPENDED,
     CONN_STATE_LINGER
 } conn_state_e;
 
index 0fd6b758ddfac24ab560378f7e6020d2d02023fc..fdf86c0644f4d739e7e8934d1ea0531b1c790e4e 100644 (file)
@@ -154,7 +154,8 @@ static int ap_process_http_async_connection(conn_rec *c)
                 r = NULL;
             }
 
-            if (cs->state != CONN_STATE_WRITE_COMPLETION) {
+            if (cs->state != CONN_STATE_WRITE_COMPLETION && 
+                cs->state != CONN_STATE_SUSPENDED) {
                 /* Something went wrong; close the connection */
                 cs->state = CONN_STATE_LINGER;
             }
index 3b17c76e549acf0bd96276820fc3481726816af3..5bac240da27cd71ac47a5fdf92aa7a6e2e3735b5 100644 (file)
@@ -213,13 +213,39 @@ static void check_pipeline(conn_rec *c)
 }
 
 
-void ap_process_async_request(request_rec *r)
+void ap_process_request_after_handler(request_rec *r)
 {
-    int access_status;
     apr_bucket_brigade *bb;
     apr_bucket *b;
     conn_rec *c = r->connection;
 
+    /* Send an EOR bucket through the output filter chain.  When
+     * this bucket is destroyed, the request will be logged and
+     * its pool will be freed
+     */
+    bb = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
+    b = ap_bucket_eor_create(r->connection->bucket_alloc, r);
+    APR_BRIGADE_INSERT_HEAD(bb, b);
+    
+    ap_pass_brigade(r->connection->output_filters, bb);
+    
+    /* From here onward, it is no longer safe to reference r
+     * or r->pool, because r->pool may have been destroyed
+     * already by the EOR bucket's cleanup function.
+     */
+    
+    c->cs->state = CONN_STATE_WRITE_COMPLETION;
+    check_pipeline(c);
+    if (ap_extended_status) {
+        ap_time_process_request(c->sbh, STOP_PREQUEST);
+    }
+}
+
+void ap_process_async_request(request_rec *r)
+{
+    conn_rec *c = r->connection;
+    int access_status;
+
     /* Give quick handlers a shot at serving the request on the fast
      * path, bypassing all of the other Apache hooks.
      *
@@ -234,8 +260,12 @@ void ap_process_async_request(request_rec *r)
      * Use this hook with extreme care and only if you know what you are
      * doing.
      */
-    if (ap_extended_status)
+    if (ap_extended_status) {
         ap_time_process_request(r->connection->sbh, START_PREQUEST);
+    }
+
+    apr_thread_mutex_create(&r->invoke_mtx, APR_THREAD_MUTEX_DEFAULT, r->pool);
+    apr_thread_mutex_lock(r->invoke_mtx);
     access_status = ap_run_quick_handler(r, 0);  /* Not a look-up request */
     if (access_status == DECLINED) {
         access_status = ap_process_request_internal(r);
@@ -244,6 +274,16 @@ void ap_process_async_request(request_rec *r)
         }
     }
 
+    if (access_status == SUSPENDED) {
+        if (ap_extended_status) {
+            ap_time_process_request(c->sbh, STOP_PREQUEST);
+        }
+        c->cs->state = CONN_STATE_SUSPENDED;
+        apr_thread_mutex_unlock(r->invoke_mtx);
+        return;
+    }
+    apr_thread_mutex_unlock(r->invoke_mtx);
+
     if (access_status == DONE) {
         /* e.g., something not in storage like TRACE */
         access_status = OK;
@@ -257,24 +297,7 @@ void ap_process_async_request(request_rec *r)
         ap_die(access_status, r);
     }
 
-    /* Send an EOR bucket through the output filter chain.  When
-     * this bucket is destroyed, the request will be logged and
-     * its pool will be freed
-     */
-    bb = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
-    b = ap_bucket_eor_create(r->connection->bucket_alloc, r);
-    APR_BRIGADE_INSERT_HEAD(bb, b);
-    ap_pass_brigade(r->connection->output_filters, bb);
-
-    /* From here onward, it is no longer safe to reference r
-     * or r->pool, because r->pool may have been destroyed
-     * already by the EOR bucket's cleanup function.
-     */
-
-    c->cs->state = CONN_STATE_WRITE_COMPLETION;
-    check_pipeline(c);
-    if (ap_extended_status)
-        ap_time_process_request(c->sbh, STOP_PREQUEST);
+    return ap_process_request_after_handler(r);
 }
 
 void ap_process_request(request_rec *r)
index 01bc0fa9717191217a07c19b25e1c5aa5db82142..9c150f488f47f3a220216051fd8fee63945b5e36 100644 (file)
@@ -6,4 +6,6 @@ APACHE_MODULE(optional_hook_import, example optional hook importer, , , no)
 APACHE_MODULE(optional_fn_import, example optional function importer, , , no)
 APACHE_MODULE(optional_fn_export, example optional function exporter, , , no)
 
+APACHE_MODULE(dialup, rate limits static files to dialup modem speeds, , , no)
+
 APACHE_MODPATH_FINISH
diff --git a/modules/test/mod_dialup.c b/modules/test/mod_dialup.c
new file mode 100644 (file)
index 0000000..349cd2c
--- /dev/null
@@ -0,0 +1,308 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+#include "httpd.h"
+#include "util_filter.h"
+#include "http_log.h"
+#include "http_config.h"
+#include "http_request.h"
+
+/* to detect sendfile enabled, we need CORE_PRIVATE. Someone should fix this. */
+#define CORE_PRIVATE
+#include "http_core.h"
+
+
+module AP_MODULE_DECLARE_DATA dialup_module;
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+
+typedef struct dialup_dcfg_t {
+    apr_size_t bytes_per_second;
+} dialup_dcfg_t;
+
+typedef struct dialup_baton_t {
+    apr_size_t bytes_per_second;
+    request_rec *r;
+    apr_file_t *fd;
+    apr_bucket_brigade *bb;
+    apr_bucket_brigade *tmpbb;
+} dialup_baton_t;
+
+static int
+dialup_send_pulse(dialup_baton_t *db)
+{
+    int status;
+    apr_off_t len = 0;
+    apr_size_t bytes_sent = 0;
+    
+    while (!APR_BRIGADE_EMPTY(db->bb) && bytes_sent < db->bytes_per_second) {
+        apr_bucket *e;
+
+        if (db->r->connection->aborted) {
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
+        
+        status = apr_brigade_partition(db->bb, db->bytes_per_second, &e);
+
+        if (status != APR_SUCCESS && status != APR_INCOMPLETE) {
+            /* XXXXXX: Log me. */
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
+
+        if (e != APR_BRIGADE_SENTINEL(db->bb)) {
+            apr_bucket *f;
+            apr_bucket *b = APR_BUCKET_PREV(e);
+            f = APR_RING_FIRST(&db->bb->list);
+            APR_RING_UNSPLICE(f, b, link);
+            APR_RING_SPLICE_HEAD(&db->tmpbb->list, f, b, apr_bucket, link);
+        }
+        else {
+            APR_BRIGADE_CONCAT(db->tmpbb, db->bb);
+        }
+        
+        e = apr_bucket_flush_create(db->r->connection->bucket_alloc);
+        
+        APR_BRIGADE_INSERT_TAIL(db->tmpbb, e);
+
+        apr_brigade_length(db->tmpbb, 1, &len);
+        bytes_sent += len;
+        status = ap_pass_brigade(db->r->output_filters, db->tmpbb);
+
+        apr_brigade_cleanup(db->tmpbb);
+
+        if (status != OK) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, status, db->r,
+                          "dialup: pulse: ap_pass_brigade failed:");
+            return status;
+        }
+    }
+
+    if (APR_BRIGADE_EMPTY(db->bb)) {
+        return DONE;
+    }
+    else {
+        return SUSPENDED;
+    }
+}
+
+void 
+dialup_callback(void *baton)
+{
+    int status;
+    dialup_baton_t *db = (dialup_baton_t *)baton;
+
+    apr_thread_mutex_lock(db->r->invoke_mtx);
+
+    status = dialup_send_pulse(db);
+
+    if (status == SUSPENDED) {
+        ap_mpm_register_timed_callback(apr_time_from_sec(1), dialup_callback, baton);
+    }
+    else if (status == DONE) {
+        apr_thread_mutex_unlock(db->r->invoke_mtx);
+        ap_finalize_request_protocol(db->r);
+        ap_process_request_after_handler(db->r);
+        return;
+    }
+    else {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, status, db->r,
+                      "dialup: pulse returned: %d", status);
+        db->r->status = HTTP_OK;
+        ap_die(status, db->r);
+    }
+
+    apr_thread_mutex_unlock(db->r->invoke_mtx);
+}
+
+static int
+dialup_handler(request_rec *r)
+{
+    int status;
+    apr_status_t rv;
+
+    /* See core.c, default handler for all of the cases we just decline. */
+    if (r->method_number != M_GET || 
+        r->finfo.filetype == 0 || 
+        r->finfo.filetype == APR_DIR) {
+        return DECLINED;
+    }
+
+    dialup_dcfg_t *dcfg = ap_get_module_config(r->per_dir_config,
+                                               &dialup_module);
+    if (dcfg->bytes_per_second == 0) {
+        return DECLINED;
+    }
+    core_dir_config *ccfg = ap_get_module_config(r->per_dir_config,
+                                                     &core_module);
+    
+    apr_file_t *fd;
+
+    rv = apr_file_open(&fd, r->filename, APR_READ | APR_BINARY
+#if APR_HAS_SENDFILE
+                           | ((ccfg->enable_sendfile == ENABLE_SENDFILE_OFF)
+                              ? 0 : APR_SENDFILE_ENABLED)
+#endif
+                       , 0, r->pool);
+
+    if (rv) {
+        return DECLINED;
+    }
+
+    /* copied from default handler: */
+    ap_update_mtime(r, r->finfo.mtime);
+    ap_set_last_modified(r);
+    ap_set_etag(r);
+    apr_table_setn(r->headers_out, "Accept-Ranges", "bytes");
+    ap_set_content_length(r, r->finfo.size);
+
+    status = ap_meets_conditions(r);
+    if (status != OK) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+                      "dialup: declined, meets conditions, good luck core handler");
+        return DECLINED;
+    }
+
+    apr_bucket_brigade *bb;
+
+    dialup_baton_t *db = apr_palloc(r->pool, sizeof(dialup_baton_t));
+    
+    db->bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
+    db->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
+
+    apr_bucket *e;
+
+    e = apr_brigade_insert_file(db->bb, fd, 0, r->finfo.size, r->pool);
+
+#if APR_HAS_MMAP
+    if (ccfg->enable_mmap == ENABLE_MMAP_OFF) {
+        apr_bucket_file_enable_mmap(e, 0);
+    }
+#endif
+    
+    
+    db->bytes_per_second = dcfg->bytes_per_second;
+    db->r = r;
+    db->fd = fd;
+
+    e = apr_bucket_eos_create(r->connection->bucket_alloc);
+
+    APR_BRIGADE_INSERT_TAIL(db->bb, e);
+
+    status = dialup_send_pulse(db);
+    if (status != SUSPENDED && status != DONE) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+                      "dialup: failed, send pulse");
+        return status;
+    }
+
+    ap_mpm_register_timed_callback(apr_time_from_sec(1), dialup_callback, db);
+
+    return SUSPENDED;
+}
+
+
+
+#ifndef APR_HOOK_ALMOST_LAST
+#define APR_HOOK_ALMOST_LAST (APR_HOOK_REALLY_LAST - 1)
+#endif
+
+static void
+dialup_register_hooks(apr_pool_t *p)
+{
+    ap_hook_handler(dialup_handler, NULL, NULL, APR_HOOK_ALMOST_LAST);
+}
+
+typedef struct modem_speed_t {
+    const char *name;
+    apr_size_t bytes_per_second;
+} modem_speed_t;
+
+#ifndef BITRATE_TO_BYTES
+#define BITRATE_TO_BYTES(x) ((1000 * x)/8)
+#endif
+
+static const modem_speed_t modem_bitrates[] =
+{
+    {"V.21",    BITRATE_TO_BYTES(0.1)},
+    {"V.26bis", BITRATE_TO_BYTES(2.4)},
+    {"V.32",    BITRATE_TO_BYTES(9.6)},
+    {"V.34",    BITRATE_TO_BYTES(28.8)},
+    {"V.92",    BITRATE_TO_BYTES(56.0)},
+    {"i-was-rich-and-got-a-leased-line", BITRATE_TO_BYTES(1500)},
+    {NULL, 0}
+};
+
+static const char *
+cmd_modem_standard(cmd_parms *cmd,
+             void *dconf,
+             const char *input)
+{
+    const modem_speed_t *standard;
+    int i = 0;
+    dialup_dcfg_t *dcfg = (dialup_dcfg_t*)dconf;
+    
+    dcfg->bytes_per_second = 0;
+
+    while (modem_bitrates[i].name != NULL) {
+        standard = &modem_bitrates[i];
+        if (strcasecmp(standard->name, input) == 0) {
+            dcfg->bytes_per_second = standard->bytes_per_second;
+            break;
+        }
+        i++;
+    }
+
+    if (dcfg->bytes_per_second == 0) {
+        return "mod_diaulup: Unkonwn Modem Standard specified.";
+    }
+
+    return NULL;
+}
+
+static void *
+dialup_dcfg_create(apr_pool_t *p, char *dummy)
+{
+    dialup_dcfg_t *cfg = apr_palloc(p, sizeof(dialup_dcfg_t));
+    
+    cfg->bytes_per_second = 0;
+
+    return cfg;
+}
+
+
+static const command_rec dialup_cmds[] =
+{
+    AP_INIT_TAKE1("ModemStandard", cmd_modem_standard, NULL, ACCESS_CONF,
+                  "Modem Standard to.. simulate. "
+                  "Must be one of: 'V.21', 'V.26bis', 'V.32', 'V.34', or 'V.92'"),
+    NULL
+};
+
+module AP_MODULE_DECLARE_DATA dialup_module =
+{
+    STANDARD20_MODULE_STUFF,
+    dialup_dcfg_create,
+    NULL,
+    NULL,
+    NULL,
+    dialup_cmds,
+    dialup_register_hooks
+};
index 3b8a00a111690120593d381af648e25360aa6f92..9dbeb25efb04e4f7b92f2992459fe16650d58d54 100644 (file)
@@ -381,7 +381,7 @@ AP_CORE_DECLARE(int) ap_invoke_handler(request_rec *r)
         ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
             "handler \"%s\" not found for: %s", r->handler, r->filename);
     }
-    if ((result != OK) && (result != DONE) && (result != DECLINED)
+    if ((result != OK) && (result != DONE) && (result != DECLINED) && (result != SUSPENDED)
         && !ap_is_HTTP_VALID_RESPONSE(result)) {
         /* If a module is deliberately returning something else
          * (request_rec in non-HTTP or proprietary extension?)
index 33868509c4f3970da52147dbea79d619ec6baeda..3aa10e686ae610a67805cd218c14ebff822cc7e1 100644 (file)
@@ -612,7 +612,9 @@ static int process_socket(apr_pool_t * p, apr_socket_t * sock,
          * like the Worker MPM does.
          */
         ap_run_process_connection(c);
-        cs->state = CONN_STATE_LINGER;
+        if (cs->state != CONN_STATE_SUSPENDED) {
+            cs->state = CONN_STATE_LINGER;
+        }
     }
 
 read_request:
@@ -796,6 +798,11 @@ static apr_status_t init_pollset(apr_pool_t *p)
     return APR_SUCCESS;
 }
 
+static apr_status_t push_timer2worker(timer_event_t* te)
+{
+    return ap_queue_push_timer(worker_queue, te);
+}
+
 static apr_status_t push2worker(const apr_pollfd_t * pfd,
                                 apr_pollset_t * pollset)
 {
@@ -871,8 +878,70 @@ static int get_worker(int *have_idle_worker_p)
     }
 }
 
+/* XXXXXX: Convert to skiplist or other better data structure 
+ * (yes, this is VERY VERY VERY VERY BAD)
+ */
+
+/* Structures to reuse */
+static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
+/* Active timers */
+static APR_RING_HEAD(timer_ring_t, timer_event_t) timer_ring;
+
+static apr_thread_mutex_t *g_timer_ring_mtx;
+
+AP_DECLARE(void) ap_mpm_register_timed_callback(apr_time_t t,
+                                                ap_mpm_callback_fn_t *cbfn,
+                                                void *baton)
+{
+    timer_event_t *ep;
+    timer_event_t *te;
+    /* oh yeah, and make locking smarter/fine grained. */
+    apr_thread_mutex_lock(g_timer_ring_mtx);
+
+    if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
+        te = APR_RING_FIRST(&timer_free_ring);
+        APR_RING_REMOVE(te, link);
+    }
+    else {
+        /* XXXXX: lol, pool allocation without a context from any thread.Yeah. Right. MPMs Suck. */
+        te = malloc(sizeof(timer_event_t));
+        APR_RING_ELEM_INIT(te, link);
+    }
+
+    te->cbfunc = cbfn;
+    te->baton = baton;
+    /* XXXXX: optimize */
+    te->when = t + apr_time_now();
+
+    /* Okay, insert sorted by when.. */
+    int inserted = 0;
+    for (ep = APR_RING_FIRST(&timer_ring);
+         ep != APR_RING_SENTINEL(&timer_ring,
+                                 timer_event_t, link);
+         ep = APR_RING_NEXT(ep, link))
+    {
+        if (ep->when > te->when) {
+            inserted = 1;
+            APR_RING_INSERT_BEFORE(ep, te, link);
+            break;
+        }
+    }
+    
+    if (!inserted) {
+        APR_RING_INSERT_TAIL(&timer_ring, te, timer_event_t, link);
+    }
+
+    apr_thread_mutex_unlock(g_timer_ring_mtx);
+}
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 {
+    timer_event_t *ep;
+    timer_event_t *te;
     apr_status_t rc;
     proc_info *ti = dummy;
     int process_slot = ti->pid;
@@ -891,20 +960,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 
     free(ti);
 
-    /* We set this to force apr_pollset to wakeup if there hasn't been any IO
-     * on any of its sockets.  This allows sockets to have been added
-     * when no other keepalive operations where going on.
-     *
-     * current value is 1 second
-     */
-    timeout_interval = 1000000;
-
     /* the following times out events that are really close in the future
      *   to prevent extra poll calls
      *
      * current value is .1 second
      */
 #define TIMEOUT_FUDGE_FACTOR 100000
+#define EVENT_FUDGE_FACTOR 10000
 
     rc = init_pollset(tpool);
     if (rc != APR_SUCCESS) {
@@ -927,6 +989,26 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             check_infinite_requests();
         }
 
+
+        {
+            apr_time_t now = apr_time_now();
+            apr_thread_mutex_lock(g_timer_ring_mtx);
+
+            if (!APR_RING_EMPTY(&timer_ring, timer_event_t, link)) {
+                te = APR_RING_FIRST(&timer_ring);
+                if (te->when > now) {
+                    timeout_interval = te->when - now;
+                }
+                else {
+                    timeout_interval = 1;
+                }
+            }
+            else {
+                timeout_interval = apr_time_from_msec(100);
+            }
+            apr_thread_mutex_unlock(g_timer_ring_mtx);
+        }
+
         rc = apr_pollset_poll(event_pollset, timeout_interval, &num,
                               &out_pfd);
 
@@ -945,6 +1027,25 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         if (listener_may_exit)
             break;
 
+        {
+            apr_time_t now = apr_time_now();
+            apr_thread_mutex_lock(g_timer_ring_mtx);
+            for (ep = APR_RING_FIRST(&timer_ring);
+                 ep != APR_RING_SENTINEL(&timer_ring,
+                                         timer_event_t, link);
+                 ep = APR_RING_FIRST(&timer_ring))
+            {
+                if (ep->when < now + EVENT_FUDGE_FACTOR) {
+                    APR_RING_REMOVE(ep, link);
+                    push_timer2worker(ep);
+                }
+                else {
+                    break;
+                }
+            }
+            apr_thread_mutex_unlock(g_timer_ring_mtx);
+        }
+
         while (num && get_worker(&have_idle_worker)) {
             pt = (listener_poll_type *) out_pfd->client_data;
             if (pt->type == PT_CSD) {
@@ -1143,7 +1244,8 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
     apr_pool_t *ptrans;         /* Pool for per-transaction stuff */
     apr_status_t rv;
     int is_idle = 0;
-
+    timer_event_t *te = NULL;
+    
     free(ti);
 
     ap_scoreboard_image->servers[process_slot][thread_slot].pid = ap_my_pid;
@@ -1171,7 +1273,10 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
         if (workers_may_exit) {
             break;
         }
-        rv = ap_queue_pop(worker_queue, &csd, &cs, &ptrans);
+
+        te = NULL;
+        
+        rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te);
 
         if (rv != APR_SUCCESS) {
             /* We get APR_EOF during a graceful shutdown once all the
@@ -1201,13 +1306,25 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
             }
             continue;
         }
-        is_idle = 0;
-        worker_sockets[thread_slot] = csd;
-        rv = process_socket(ptrans, csd, cs, process_slot, thread_slot);
-        if (!rv) {
-            requests_this_child--;
+        if (te != NULL) {
+            
+            te->cbfunc(te->baton);
+
+            {
+                apr_thread_mutex_lock(g_timer_ring_mtx);
+                APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t, link);
+                apr_thread_mutex_unlock(g_timer_ring_mtx);
+            }
+        }
+        else {
+            is_idle = 0;
+            worker_sockets[thread_slot] = csd;
+            rv = process_socket(ptrans, csd, cs, process_slot, thread_slot);
+            if (!rv) {
+                requests_this_child--;
+            }
+            worker_sockets[thread_slot] = NULL;
         }
-        worker_sockets[thread_slot] = NULL;
     }
 
     ap_update_child_status_from_indexes(process_slot, thread_slot,
@@ -1462,6 +1579,10 @@ static void child_main(int child_num_arg)
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
+    apr_thread_mutex_create(&g_timer_ring_mtx, APR_THREAD_MUTEX_DEFAULT, pchild);
+    APR_RING_INIT(&timer_free_ring, timer_event_t, link);
+    APR_RING_INIT(&timer_ring, timer_event_t, link);
+    
     ap_run_child_init(pchild, ap_server_conf);
 
     /* done with init critical section */
index e9f5f5345d4e8fd3dd120845a63391e60f6729d8..21005043940a06c4830e4e5525681cf85a48b62f 100644 (file)
@@ -268,7 +268,7 @@ apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info)
  * Detects when the fd_queue_t is empty. This utility function is expected
  * to be called from within critical sections, and is not threadsafe.
  */
-#define ap_queue_empty(queue) ((queue)->nelts == 0)
+#define ap_queue_empty(queue) ((queue)->nelts == 0 && APR_RING_EMPTY(&queue->timers ,timer_event_t, link))
 
 /**
  * Callback routine that is called to destroy this
@@ -305,6 +305,8 @@ apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity,
         return rv;
     }
 
+    APR_RING_INIT(&queue->timers, timer_event_t, link);
+
     queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
     queue->bounds = queue_capacity;
     queue->nelts = 0;
@@ -353,14 +355,36 @@ apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd,
     return APR_SUCCESS;
 }
 
+apr_status_t ap_queue_push_timer(fd_queue_t * queue, timer_event_t *te)
+{
+    apr_status_t rv;
+    
+    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
+        return rv;
+    }
+    
+    AP_DEBUG_ASSERT(!queue->terminated);
+
+    APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link);
+
+    apr_thread_cond_signal(queue->not_empty);
+    
+    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
+        return rv;
+    }
+    
+    return APR_SUCCESS;
+}
+
 /**
  * Retrieves the next available socket from the queue. If there are no
  * sockets available, it will block until one becomes available.
  * Once retrieved, the socket is placed into the address specified by
  * 'sd'.
  */
-apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd,
-                          conn_state_t ** cs, apr_pool_t ** p)
+apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd,
+                                    conn_state_t ** cs, apr_pool_t ** p,
+                                    timer_event_t ** te_out)
 {
     fd_queue_elem_t *elem;
     apr_status_t rv;
@@ -389,15 +413,23 @@ apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd,
         }
     }
 
-    elem = &queue->data[--queue->nelts];
-    *sd = elem->sd;
-    *cs = elem->cs;
-    *p = elem->p;
+    *te_out = NULL;
+
+    if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) {
+        *te_out = APR_RING_FIRST(&queue->timers);
+        APR_RING_REMOVE(*te_out, link);
+    }
+    else {
+        elem = &queue->data[--queue->nelts];
+        *sd = elem->sd;
+        *cs = elem->cs;
+        *p = elem->p;
 #ifdef AP_DEBUG
-    elem->sd = NULL;
-    elem->p = NULL;
+        elem->sd = NULL;
+        elem->p = NULL;
 #endif /* AP_DEBUG */
-
+    }
+    
     rv = apr_thread_mutex_unlock(queue->one_big_mutex);
     return rv;
 }
index bd18adfd185e58fb3088b66b47fce3e8bd82a643..9482d71b0c4a52f36c3cfbbbecb32f7545a802fd 100644 (file)
@@ -37,6 +37,8 @@
 #endif
 #include <apr_errno.h>
 
+#include "ap_mpm.h"
+
 typedef struct fd_queue_info_t fd_queue_info_t;
 
 apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info,
@@ -54,8 +56,19 @@ struct fd_queue_elem_t
 };
 typedef struct fd_queue_elem_t fd_queue_elem_t;
 
+typedef struct timer_event_t timer_event_t;
+
+struct timer_event_t {
+    APR_RING_ENTRY(timer_event_t) link;
+    apr_time_t when;
+    ap_mpm_callback_fn_t *cbfunc;
+    void *baton;
+};
+
+
 struct fd_queue_t
 {
+    APR_RING_HEAD(timers_t, timer_event_t) timers;
     fd_queue_elem_t *data;
     int nelts;
     int bounds;
@@ -73,8 +86,10 @@ apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity,
                            apr_pool_t * a);
 apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd,
                            conn_state_t * cs, apr_pool_t * p);
-apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd,
-                          conn_state_t ** cs, apr_pool_t ** p);
+apr_status_t ap_queue_push_timer(fd_queue_t *queue, timer_event_t *te);
+apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd,
+                                    conn_state_t ** cs, apr_pool_t ** p,
+                                    timer_event_t ** te);
 apr_status_t ap_queue_interrupt_all(fd_queue_t * queue);
 apr_status_t ap_queue_term(fd_queue_t * queue);