Using this basic framework, you can return SUSPENDED from an HTTP Handler,
and then register a callback that is invoked by the MPM at a later time.
This initial version only supports _timers_ as callbacks, but in the future I
would like to add things like wait for socket activity, on a socket specified by
the handler.
Once in a callback, It is then the responsibility of the callback fucntion
to finish the HTTP Request handling, but this alows you to do cool things like
a fully async proxy, COMET support, or even rate limiting.
To prove I'm not insane, I've inlcuded an example module, mod_dialup.
You can configure it like this:
<Location "/docs">
ModemStandard "V.32"
</Location>
And for static files inside that path, you will be rate limited to V.32 speeds,
aka 9.6 kilobits/second.
Does anyone besides Rüdiger read commit emails :-) ?
I know there are likely huge problems with this, but I would like to see how far
we can push the Event MPM, figure out what to do better, if there is anything,
and then really dive into the 3.0 development before ApacheCon.
* server/mpm/experimental/event/fdqueue.h:
(timer_event_t): New structure to hold timer events and callback functions.
* server/mpm/experimental/event/fdqueue.c
(ap_queue_empty): Modify to also look at Timer Ring.
(ap_queue_init): Initialize Timer Ring.
(ap_queue_push_timer): New function, pushes a timer event into the queue.
(ap_queue_pop_something): Renamed function, returns a timer event or
a socket/pool for a worker thread to run.
* server/mpm/experimental/event/event.c
(process_socket): If the connection is in SUSPENDED state, don't force it
into linger mode yet, the callback will have to take care of that.
(push_timer2worker): New shortcut function, pushes timer event into queue
for a worker to run.
(timer_free_ring): New global data structure to recycle memory used by
timer events.
(timer_ring): New global data structure to hold active timer events.
(g_timer_ring_mtx): Thread mutex to protect timer event data structures.
(ap_mpm_register_timed_callback): New Function, registers a callback to be
invoked by the MPM at a later time.
(listener_thread): Calculate our wakeup time based on the upcoming Event
Queue, and after pollset_poll runs, push any Timers that have passed
onto worker threads to run.
(worker_thread): Call new queue pop method, and if the Timer Event is
non-null, invoke the callback. Once the callback is done, push the
structure onto the timer_free_ring, to be recycled.
(child_main): Initialize new mutex and ring structures.
* server/config.c
(ap_invoke_handler): Allow SUSPENDED aa valid return code from handlers.
* modules/http/http_core.c
(ap_process_http_async_connection): Don't close the connection when in
SUSPENDED state.
* modules/http/http_request.c
(ap_process_request_after_handler): New function, body pulled from the old,
ap_process_async_request. Split to let handlers invoke this so they
don't need to know all of the details of finishing a request.
(ap_process_async_request): If the handler returns SUSPENDED, don't do
anything but return.
* include/ap_mmn.h: Bump MMN.
* include/ap_mpm.h
(ap_mpm_register_timed_callback): New function.
* include/httpd.h:
(SUSPENDED): New return code for handlers.
(request_rec::invoke_mtx): New mutex to protect callback invokcations
from being run before the original handler finishes running.
(conn_state_e): Add a suspended state.
* include/http_request.h
(ap_process_request_after_handler): New function to make it easier for
handlers to finish the HTTP Request.
* modules/test/config.m4: Add mod_dialup to build.
* modules/test/mod_dialup.c: New rate limiting module, requires the Event MPM
to work.
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@697357
13f79535-47bb-0310-9956-
ffa450edef68
* 20080722.2 (2.3.0-dev) Add scolonsep to proxy_balancer
* 20080829.0 (2.3.0-dev) Add cookie attributes when removing cookies
* 20080830.0 (2.3.0-dev) Cookies can be set on headers_out and err_headers_out
+ * 20080920.0 (2.3.0-dev) Add ap_mpm_register_timed_callback.
*
*/
#define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */
#ifndef MODULE_MAGIC_NUMBER_MAJOR
-#define MODULE_MAGIC_NUMBER_MAJOR 20080830
+#define MODULE_MAGIC_NUMBER_MAJOR 20080920
#endif
#define MODULE_MAGIC_NUMBER_MINOR 0 /* 0...n */
*/
AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result);
+
+typedef void (ap_mpm_callback_fn_t)(void *baton);
+
+/* XXXXXXX: only added support in the Event MPM.... */
+AP_DECLARE(void) ap_mpm_register_timed_callback(apr_time_t t,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton);
+
/* Defining GPROF when compiling uses the moncontrol() function to
* disable gprof profiling in the parent, and enable it only for
* request processing in children (or in one_process mode). It's
*/
void ap_process_request(request_rec *);
-/**
+/* For post-processing after a handler has finished with a request. (Commonly used after it was suspended) */
+void ap_process_request_after_handler(request_rec *r);
+
+ /**
* Process a top-level request from a client, allowing some or all of
* the response to remain buffered in the core output filter for later,
* asynchronous write completion
#define DONE -2 /**< Module has served the response completely
* - it's safe to die() with no more output
*/
+#define SUSPENDED -3 /**< Module will handle the remainder of the request.
+ * The core will never invoke the request again, */
#define OK 0 /**< Module has handled this stage. */
/** The optional kept body of the request. */
apr_bucket_brigade *kept_body;
+ apr_thread_mutex_t *invoke_mtx;
+
/* Things placed at the end of the record to avoid breaking binary
* compatibility. It would be nice to remember to reorder the entire
* record to improve 64bit alignment the next time we need to break
CONN_STATE_READ_REQUEST_LINE,
CONN_STATE_HANDLER,
CONN_STATE_WRITE_COMPLETION,
+ CONN_STATE_SUSPENDED,
CONN_STATE_LINGER
} conn_state_e;
r = NULL;
}
- if (cs->state != CONN_STATE_WRITE_COMPLETION) {
+ if (cs->state != CONN_STATE_WRITE_COMPLETION &&
+ cs->state != CONN_STATE_SUSPENDED) {
/* Something went wrong; close the connection */
cs->state = CONN_STATE_LINGER;
}
}
-void ap_process_async_request(request_rec *r)
+void ap_process_request_after_handler(request_rec *r)
{
- int access_status;
apr_bucket_brigade *bb;
apr_bucket *b;
conn_rec *c = r->connection;
+ /* Send an EOR bucket through the output filter chain. When
+ * this bucket is destroyed, the request will be logged and
+ * its pool will be freed
+ */
+ bb = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
+ b = ap_bucket_eor_create(r->connection->bucket_alloc, r);
+ APR_BRIGADE_INSERT_HEAD(bb, b);
+
+ ap_pass_brigade(r->connection->output_filters, bb);
+
+ /* From here onward, it is no longer safe to reference r
+ * or r->pool, because r->pool may have been destroyed
+ * already by the EOR bucket's cleanup function.
+ */
+
+ c->cs->state = CONN_STATE_WRITE_COMPLETION;
+ check_pipeline(c);
+ if (ap_extended_status) {
+ ap_time_process_request(c->sbh, STOP_PREQUEST);
+ }
+}
+
+void ap_process_async_request(request_rec *r)
+{
+ conn_rec *c = r->connection;
+ int access_status;
+
/* Give quick handlers a shot at serving the request on the fast
* path, bypassing all of the other Apache hooks.
*
* Use this hook with extreme care and only if you know what you are
* doing.
*/
- if (ap_extended_status)
+ if (ap_extended_status) {
ap_time_process_request(r->connection->sbh, START_PREQUEST);
+ }
+
+ apr_thread_mutex_create(&r->invoke_mtx, APR_THREAD_MUTEX_DEFAULT, r->pool);
+ apr_thread_mutex_lock(r->invoke_mtx);
access_status = ap_run_quick_handler(r, 0); /* Not a look-up request */
if (access_status == DECLINED) {
access_status = ap_process_request_internal(r);
}
}
+ if (access_status == SUSPENDED) {
+ if (ap_extended_status) {
+ ap_time_process_request(c->sbh, STOP_PREQUEST);
+ }
+ c->cs->state = CONN_STATE_SUSPENDED;
+ apr_thread_mutex_unlock(r->invoke_mtx);
+ return;
+ }
+ apr_thread_mutex_unlock(r->invoke_mtx);
+
if (access_status == DONE) {
/* e.g., something not in storage like TRACE */
access_status = OK;
ap_die(access_status, r);
}
- /* Send an EOR bucket through the output filter chain. When
- * this bucket is destroyed, the request will be logged and
- * its pool will be freed
- */
- bb = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
- b = ap_bucket_eor_create(r->connection->bucket_alloc, r);
- APR_BRIGADE_INSERT_HEAD(bb, b);
- ap_pass_brigade(r->connection->output_filters, bb);
-
- /* From here onward, it is no longer safe to reference r
- * or r->pool, because r->pool may have been destroyed
- * already by the EOR bucket's cleanup function.
- */
-
- c->cs->state = CONN_STATE_WRITE_COMPLETION;
- check_pipeline(c);
- if (ap_extended_status)
- ap_time_process_request(c->sbh, STOP_PREQUEST);
+ return ap_process_request_after_handler(r);
}
void ap_process_request(request_rec *r)
APACHE_MODULE(optional_fn_import, example optional function importer, , , no)
APACHE_MODULE(optional_fn_export, example optional function exporter, , , no)
+APACHE_MODULE(dialup, rate limits static files to dialup modem speeds, , , no)
+
APACHE_MODPATH_FINISH
--- /dev/null
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+#include "httpd.h"
+#include "util_filter.h"
+#include "http_log.h"
+#include "http_config.h"
+#include "http_request.h"
+
+/* to detect sendfile enabled, we need CORE_PRIVATE. Someone should fix this. */
+#define CORE_PRIVATE
+#include "http_core.h"
+
+
+module AP_MODULE_DECLARE_DATA dialup_module;
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+
+typedef struct dialup_dcfg_t {
+ apr_size_t bytes_per_second;
+} dialup_dcfg_t;
+
+typedef struct dialup_baton_t {
+ apr_size_t bytes_per_second;
+ request_rec *r;
+ apr_file_t *fd;
+ apr_bucket_brigade *bb;
+ apr_bucket_brigade *tmpbb;
+} dialup_baton_t;
+
+static int
+dialup_send_pulse(dialup_baton_t *db)
+{
+ int status;
+ apr_off_t len = 0;
+ apr_size_t bytes_sent = 0;
+
+ while (!APR_BRIGADE_EMPTY(db->bb) && bytes_sent < db->bytes_per_second) {
+ apr_bucket *e;
+
+ if (db->r->connection->aborted) {
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ status = apr_brigade_partition(db->bb, db->bytes_per_second, &e);
+
+ if (status != APR_SUCCESS && status != APR_INCOMPLETE) {
+ /* XXXXXX: Log me. */
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ if (e != APR_BRIGADE_SENTINEL(db->bb)) {
+ apr_bucket *f;
+ apr_bucket *b = APR_BUCKET_PREV(e);
+ f = APR_RING_FIRST(&db->bb->list);
+ APR_RING_UNSPLICE(f, b, link);
+ APR_RING_SPLICE_HEAD(&db->tmpbb->list, f, b, apr_bucket, link);
+ }
+ else {
+ APR_BRIGADE_CONCAT(db->tmpbb, db->bb);
+ }
+
+ e = apr_bucket_flush_create(db->r->connection->bucket_alloc);
+
+ APR_BRIGADE_INSERT_TAIL(db->tmpbb, e);
+
+ apr_brigade_length(db->tmpbb, 1, &len);
+ bytes_sent += len;
+ status = ap_pass_brigade(db->r->output_filters, db->tmpbb);
+
+ apr_brigade_cleanup(db->tmpbb);
+
+ if (status != OK) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, status, db->r,
+ "dialup: pulse: ap_pass_brigade failed:");
+ return status;
+ }
+ }
+
+ if (APR_BRIGADE_EMPTY(db->bb)) {
+ return DONE;
+ }
+ else {
+ return SUSPENDED;
+ }
+}
+
+void
+dialup_callback(void *baton)
+{
+ int status;
+ dialup_baton_t *db = (dialup_baton_t *)baton;
+
+ apr_thread_mutex_lock(db->r->invoke_mtx);
+
+ status = dialup_send_pulse(db);
+
+ if (status == SUSPENDED) {
+ ap_mpm_register_timed_callback(apr_time_from_sec(1), dialup_callback, baton);
+ }
+ else if (status == DONE) {
+ apr_thread_mutex_unlock(db->r->invoke_mtx);
+ ap_finalize_request_protocol(db->r);
+ ap_process_request_after_handler(db->r);
+ return;
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, status, db->r,
+ "dialup: pulse returned: %d", status);
+ db->r->status = HTTP_OK;
+ ap_die(status, db->r);
+ }
+
+ apr_thread_mutex_unlock(db->r->invoke_mtx);
+}
+
+static int
+dialup_handler(request_rec *r)
+{
+ int status;
+ apr_status_t rv;
+
+ /* See core.c, default handler for all of the cases we just decline. */
+ if (r->method_number != M_GET ||
+ r->finfo.filetype == 0 ||
+ r->finfo.filetype == APR_DIR) {
+ return DECLINED;
+ }
+
+ dialup_dcfg_t *dcfg = ap_get_module_config(r->per_dir_config,
+ &dialup_module);
+ if (dcfg->bytes_per_second == 0) {
+ return DECLINED;
+ }
+ core_dir_config *ccfg = ap_get_module_config(r->per_dir_config,
+ &core_module);
+
+ apr_file_t *fd;
+
+ rv = apr_file_open(&fd, r->filename, APR_READ | APR_BINARY
+#if APR_HAS_SENDFILE
+ | ((ccfg->enable_sendfile == ENABLE_SENDFILE_OFF)
+ ? 0 : APR_SENDFILE_ENABLED)
+#endif
+ , 0, r->pool);
+
+ if (rv) {
+ return DECLINED;
+ }
+
+ /* copied from default handler: */
+ ap_update_mtime(r, r->finfo.mtime);
+ ap_set_last_modified(r);
+ ap_set_etag(r);
+ apr_table_setn(r->headers_out, "Accept-Ranges", "bytes");
+ ap_set_content_length(r, r->finfo.size);
+
+ status = ap_meets_conditions(r);
+ if (status != OK) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "dialup: declined, meets conditions, good luck core handler");
+ return DECLINED;
+ }
+
+ apr_bucket_brigade *bb;
+
+ dialup_baton_t *db = apr_palloc(r->pool, sizeof(dialup_baton_t));
+
+ db->bb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
+ db->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
+
+ apr_bucket *e;
+
+ e = apr_brigade_insert_file(db->bb, fd, 0, r->finfo.size, r->pool);
+
+#if APR_HAS_MMAP
+ if (ccfg->enable_mmap == ENABLE_MMAP_OFF) {
+ apr_bucket_file_enable_mmap(e, 0);
+ }
+#endif
+
+
+ db->bytes_per_second = dcfg->bytes_per_second;
+ db->r = r;
+ db->fd = fd;
+
+ e = apr_bucket_eos_create(r->connection->bucket_alloc);
+
+ APR_BRIGADE_INSERT_TAIL(db->bb, e);
+
+ status = dialup_send_pulse(db);
+ if (status != SUSPENDED && status != DONE) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "dialup: failed, send pulse");
+ return status;
+ }
+
+ ap_mpm_register_timed_callback(apr_time_from_sec(1), dialup_callback, db);
+
+ return SUSPENDED;
+}
+
+
+
+#ifndef APR_HOOK_ALMOST_LAST
+#define APR_HOOK_ALMOST_LAST (APR_HOOK_REALLY_LAST - 1)
+#endif
+
+static void
+dialup_register_hooks(apr_pool_t *p)
+{
+ ap_hook_handler(dialup_handler, NULL, NULL, APR_HOOK_ALMOST_LAST);
+}
+
+typedef struct modem_speed_t {
+ const char *name;
+ apr_size_t bytes_per_second;
+} modem_speed_t;
+
+#ifndef BITRATE_TO_BYTES
+#define BITRATE_TO_BYTES(x) ((1000 * x)/8)
+#endif
+
+static const modem_speed_t modem_bitrates[] =
+{
+ {"V.21", BITRATE_TO_BYTES(0.1)},
+ {"V.26bis", BITRATE_TO_BYTES(2.4)},
+ {"V.32", BITRATE_TO_BYTES(9.6)},
+ {"V.34", BITRATE_TO_BYTES(28.8)},
+ {"V.92", BITRATE_TO_BYTES(56.0)},
+ {"i-was-rich-and-got-a-leased-line", BITRATE_TO_BYTES(1500)},
+ {NULL, 0}
+};
+
+static const char *
+cmd_modem_standard(cmd_parms *cmd,
+ void *dconf,
+ const char *input)
+{
+ const modem_speed_t *standard;
+ int i = 0;
+ dialup_dcfg_t *dcfg = (dialup_dcfg_t*)dconf;
+
+ dcfg->bytes_per_second = 0;
+
+ while (modem_bitrates[i].name != NULL) {
+ standard = &modem_bitrates[i];
+ if (strcasecmp(standard->name, input) == 0) {
+ dcfg->bytes_per_second = standard->bytes_per_second;
+ break;
+ }
+ i++;
+ }
+
+ if (dcfg->bytes_per_second == 0) {
+ return "mod_diaulup: Unkonwn Modem Standard specified.";
+ }
+
+ return NULL;
+}
+
+static void *
+dialup_dcfg_create(apr_pool_t *p, char *dummy)
+{
+ dialup_dcfg_t *cfg = apr_palloc(p, sizeof(dialup_dcfg_t));
+
+ cfg->bytes_per_second = 0;
+
+ return cfg;
+}
+
+
+static const command_rec dialup_cmds[] =
+{
+ AP_INIT_TAKE1("ModemStandard", cmd_modem_standard, NULL, ACCESS_CONF,
+ "Modem Standard to.. simulate. "
+ "Must be one of: 'V.21', 'V.26bis', 'V.32', 'V.34', or 'V.92'"),
+ NULL
+};
+
+module AP_MODULE_DECLARE_DATA dialup_module =
+{
+ STANDARD20_MODULE_STUFF,
+ dialup_dcfg_create,
+ NULL,
+ NULL,
+ NULL,
+ dialup_cmds,
+ dialup_register_hooks
+};
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
"handler \"%s\" not found for: %s", r->handler, r->filename);
}
- if ((result != OK) && (result != DONE) && (result != DECLINED)
+ if ((result != OK) && (result != DONE) && (result != DECLINED) && (result != SUSPENDED)
&& !ap_is_HTTP_VALID_RESPONSE(result)) {
/* If a module is deliberately returning something else
* (request_rec in non-HTTP or proprietary extension?)
* like the Worker MPM does.
*/
ap_run_process_connection(c);
- cs->state = CONN_STATE_LINGER;
+ if (cs->state != CONN_STATE_SUSPENDED) {
+ cs->state = CONN_STATE_LINGER;
+ }
}
read_request:
return APR_SUCCESS;
}
+static apr_status_t push_timer2worker(timer_event_t* te)
+{
+ return ap_queue_push_timer(worker_queue, te);
+}
+
static apr_status_t push2worker(const apr_pollfd_t * pfd,
apr_pollset_t * pollset)
{
}
}
+/* XXXXXX: Convert to skiplist or other better data structure
+ * (yes, this is VERY VERY VERY VERY BAD)
+ */
+
+/* Structures to reuse */
+static APR_RING_HEAD(timer_free_ring_t, timer_event_t) timer_free_ring;
+/* Active timers */
+static APR_RING_HEAD(timer_ring_t, timer_event_t) timer_ring;
+
+static apr_thread_mutex_t *g_timer_ring_mtx;
+
+AP_DECLARE(void) ap_mpm_register_timed_callback(apr_time_t t,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton)
+{
+ timer_event_t *ep;
+ timer_event_t *te;
+ /* oh yeah, and make locking smarter/fine grained. */
+ apr_thread_mutex_lock(g_timer_ring_mtx);
+
+ if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
+ te = APR_RING_FIRST(&timer_free_ring);
+ APR_RING_REMOVE(te, link);
+ }
+ else {
+ /* XXXXX: lol, pool allocation without a context from any thread.Yeah. Right. MPMs Suck. */
+ te = malloc(sizeof(timer_event_t));
+ APR_RING_ELEM_INIT(te, link);
+ }
+
+ te->cbfunc = cbfn;
+ te->baton = baton;
+ /* XXXXX: optimize */
+ te->when = t + apr_time_now();
+
+ /* Okay, insert sorted by when.. */
+ int inserted = 0;
+ for (ep = APR_RING_FIRST(&timer_ring);
+ ep != APR_RING_SENTINEL(&timer_ring,
+ timer_event_t, link);
+ ep = APR_RING_NEXT(ep, link))
+ {
+ if (ep->when > te->when) {
+ inserted = 1;
+ APR_RING_INSERT_BEFORE(ep, te, link);
+ break;
+ }
+ }
+
+ if (!inserted) {
+ APR_RING_INSERT_TAIL(&timer_ring, te, timer_event_t, link);
+ }
+
+ apr_thread_mutex_unlock(g_timer_ring_mtx);
+}
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
{
+ timer_event_t *ep;
+ timer_event_t *te;
apr_status_t rc;
proc_info *ti = dummy;
int process_slot = ti->pid;
free(ti);
- /* We set this to force apr_pollset to wakeup if there hasn't been any IO
- * on any of its sockets. This allows sockets to have been added
- * when no other keepalive operations where going on.
- *
- * current value is 1 second
- */
- timeout_interval = 1000000;
-
/* the following times out events that are really close in the future
* to prevent extra poll calls
*
* current value is .1 second
*/
#define TIMEOUT_FUDGE_FACTOR 100000
+#define EVENT_FUDGE_FACTOR 10000
rc = init_pollset(tpool);
if (rc != APR_SUCCESS) {
check_infinite_requests();
}
+
+ {
+ apr_time_t now = apr_time_now();
+ apr_thread_mutex_lock(g_timer_ring_mtx);
+
+ if (!APR_RING_EMPTY(&timer_ring, timer_event_t, link)) {
+ te = APR_RING_FIRST(&timer_ring);
+ if (te->when > now) {
+ timeout_interval = te->when - now;
+ }
+ else {
+ timeout_interval = 1;
+ }
+ }
+ else {
+ timeout_interval = apr_time_from_msec(100);
+ }
+ apr_thread_mutex_unlock(g_timer_ring_mtx);
+ }
+
rc = apr_pollset_poll(event_pollset, timeout_interval, &num,
&out_pfd);
if (listener_may_exit)
break;
+ {
+ apr_time_t now = apr_time_now();
+ apr_thread_mutex_lock(g_timer_ring_mtx);
+ for (ep = APR_RING_FIRST(&timer_ring);
+ ep != APR_RING_SENTINEL(&timer_ring,
+ timer_event_t, link);
+ ep = APR_RING_FIRST(&timer_ring))
+ {
+ if (ep->when < now + EVENT_FUDGE_FACTOR) {
+ APR_RING_REMOVE(ep, link);
+ push_timer2worker(ep);
+ }
+ else {
+ break;
+ }
+ }
+ apr_thread_mutex_unlock(g_timer_ring_mtx);
+ }
+
while (num && get_worker(&have_idle_worker)) {
pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
apr_status_t rv;
int is_idle = 0;
-
+ timer_event_t *te = NULL;
+
free(ti);
ap_scoreboard_image->servers[process_slot][thread_slot].pid = ap_my_pid;
if (workers_may_exit) {
break;
}
- rv = ap_queue_pop(worker_queue, &csd, &cs, &ptrans);
+
+ te = NULL;
+
+ rv = ap_queue_pop_something(worker_queue, &csd, &cs, &ptrans, &te);
if (rv != APR_SUCCESS) {
/* We get APR_EOF during a graceful shutdown once all the
}
continue;
}
- is_idle = 0;
- worker_sockets[thread_slot] = csd;
- rv = process_socket(ptrans, csd, cs, process_slot, thread_slot);
- if (!rv) {
- requests_this_child--;
+ if (te != NULL) {
+
+ te->cbfunc(te->baton);
+
+ {
+ apr_thread_mutex_lock(g_timer_ring_mtx);
+ APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t, link);
+ apr_thread_mutex_unlock(g_timer_ring_mtx);
+ }
+ }
+ else {
+ is_idle = 0;
+ worker_sockets[thread_slot] = csd;
+ rv = process_socket(ptrans, csd, cs, process_slot, thread_slot);
+ if (!rv) {
+ requests_this_child--;
+ }
+ worker_sockets[thread_slot] = NULL;
}
- worker_sockets[thread_slot] = NULL;
}
ap_update_child_status_from_indexes(process_slot, thread_slot,
clean_child_exit(APEXIT_CHILDFATAL);
}
+ apr_thread_mutex_create(&g_timer_ring_mtx, APR_THREAD_MUTEX_DEFAULT, pchild);
+ APR_RING_INIT(&timer_free_ring, timer_event_t, link);
+ APR_RING_INIT(&timer_ring, timer_event_t, link);
+
ap_run_child_init(pchild, ap_server_conf);
/* done with init critical section */
* Detects when the fd_queue_t is empty. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
-#define ap_queue_empty(queue) ((queue)->nelts == 0)
+#define ap_queue_empty(queue) ((queue)->nelts == 0 && APR_RING_EMPTY(&queue->timers ,timer_event_t, link))
/**
* Callback routine that is called to destroy this
return rv;
}
+ APR_RING_INIT(&queue->timers, timer_event_t, link);
+
queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
queue->bounds = queue_capacity;
queue->nelts = 0;
return APR_SUCCESS;
}
+apr_status_t ap_queue_push_timer(fd_queue_t * queue, timer_event_t *te)
+{
+ apr_status_t rv;
+
+ if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+
+ AP_DEBUG_ASSERT(!queue->terminated);
+
+ APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link);
+
+ apr_thread_cond_signal(queue->not_empty);
+
+ if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+
+ return APR_SUCCESS;
+}
+
/**
* Retrieves the next available socket from the queue. If there are no
* sockets available, it will block until one becomes available.
* Once retrieved, the socket is placed into the address specified by
* 'sd'.
*/
-apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd,
- conn_state_t ** cs, apr_pool_t ** p)
+apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd,
+ conn_state_t ** cs, apr_pool_t ** p,
+ timer_event_t ** te_out)
{
fd_queue_elem_t *elem;
apr_status_t rv;
}
}
- elem = &queue->data[--queue->nelts];
- *sd = elem->sd;
- *cs = elem->cs;
- *p = elem->p;
+ *te_out = NULL;
+
+ if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) {
+ *te_out = APR_RING_FIRST(&queue->timers);
+ APR_RING_REMOVE(*te_out, link);
+ }
+ else {
+ elem = &queue->data[--queue->nelts];
+ *sd = elem->sd;
+ *cs = elem->cs;
+ *p = elem->p;
#ifdef AP_DEBUG
- elem->sd = NULL;
- elem->p = NULL;
+ elem->sd = NULL;
+ elem->p = NULL;
#endif /* AP_DEBUG */
-
+ }
+
rv = apr_thread_mutex_unlock(queue->one_big_mutex);
return rv;
}
#endif
#include <apr_errno.h>
+#include "ap_mpm.h"
+
typedef struct fd_queue_info_t fd_queue_info_t;
apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info,
};
typedef struct fd_queue_elem_t fd_queue_elem_t;
+typedef struct timer_event_t timer_event_t;
+
+struct timer_event_t {
+ APR_RING_ENTRY(timer_event_t) link;
+ apr_time_t when;
+ ap_mpm_callback_fn_t *cbfunc;
+ void *baton;
+};
+
+
struct fd_queue_t
{
+ APR_RING_HEAD(timers_t, timer_event_t) timers;
fd_queue_elem_t *data;
int nelts;
int bounds;
apr_pool_t * a);
apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd,
conn_state_t * cs, apr_pool_t * p);
-apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd,
- conn_state_t ** cs, apr_pool_t ** p);
+apr_status_t ap_queue_push_timer(fd_queue_t *queue, timer_event_t *te);
+apr_status_t ap_queue_pop_something(fd_queue_t * queue, apr_socket_t ** sd,
+ conn_state_t ** cs, apr_pool_t ** p,
+ timer_event_t ** te);
apr_status_t ap_queue_interrupt_all(fd_queue_t * queue);
apr_status_t ap_queue_term(fd_queue_t * queue);