module AP_MODULE_DECLARE_DATA mpm_event_module;
+/* forward declare */
+struct event_srv_cfg_s;
+typedef struct event_srv_cfg_s event_srv_cfg;
+
struct event_conn_state_t {
/** APR_RING of expiration timeouts */
APR_RING_ENTRY(event_conn_state_t) timeout_list;
- /** the expiration time of the next keepalive timeout */
- apr_time_t expiration_time;
+ /** the time when the entry was queued */
+ apr_time_t queue_timestamp;
/** connection record this struct refers to */
conn_rec *c;
/** request record (if any) this struct refers to */
request_rec *r;
+ /** server config this struct refers to */
+ event_srv_cfg *sc;
/** is the current conn_rec suspended? (disassociated with
* a particular MPM thread; for suspend_/resume_connection
* hooks)
struct timeout_queue {
struct timeout_head_t head;
- int count;
- const char *tag;
+ int count, *total;
+ apr_interval_time_t timeout;
+ struct timeout_queue *next;
};
/*
* Several timeout queues that use different timeouts, so that we always can
* simply append to the end.
- * write_completion_q uses TimeOut
- * keepalive_q uses KeepAliveTimeOut
+ * write_completion_q uses vhost's TimeOut
+ * keepalive_q uses vhost's KeepAliveTimeOut
* linger_q uses MAX_SECS_TO_LINGER
* short_linger_q uses SECONDS_TO_LINGER
*/
-static struct timeout_queue write_completion_q, keepalive_q, linger_q,
- short_linger_q;
+static struct timeout_queue *write_completion_q,
+ *keepalive_q,
+ *linger_q,
+ *short_linger_q;
+
static apr_pollfd_t *listener_pollfd;
/*
* Macros for accessing struct timeout_queue.
* For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
*/
-#define TO_QUEUE_APPEND(q, el) \
- do { \
- APR_RING_INSERT_TAIL(&(q).head, el, event_conn_state_t, timeout_list); \
- (q).count++; \
+#define TO_QUEUE_APPEND(q, el) \
+ do { \
+ APR_RING_INSERT_TAIL(&(q)->head, el, event_conn_state_t, \
+ timeout_list); \
+ ++*(q)->total; \
+ ++(q)->count; \
} while (0)
-#define TO_QUEUE_REMOVE(q, el) \
- do { \
- APR_RING_REMOVE(el, timeout_list); \
- (q).count--; \
+#define TO_QUEUE_REMOVE(q, el) \
+ do { \
+ APR_RING_REMOVE(el, timeout_list); \
+ --*(q)->total; \
+ --(q)->count; \
} while (0)
-#define TO_QUEUE_INIT(q) \
- do { \
- APR_RING_INIT(&(q).head, event_conn_state_t, timeout_list); \
- (q).tag = #q; \
+#define TO_QUEUE_INIT(q, p, t, v) \
+ do { \
+ struct timeout_queue *b = (v); \
+ (q) = apr_palloc((p), sizeof *(q)); \
+ APR_RING_INIT(&(q)->head, event_conn_state_t, timeout_list); \
+ (q)->total = (b) ? (b)->total : apr_pcalloc((p), sizeof *(q)->total); \
+ (q)->count = 0; \
+ (q)->timeout = (t); \
+ (q)->next = NULL; \
} while (0)
#define TO_QUEUE_ELEM_INIT(el) APR_RING_ELEM_INIT(el, timeout_list)
static event_child_bucket *all_buckets, /* All listeners buckets */
*my_bucket; /* Current child bucket */
+struct event_srv_cfg_s {
+ struct timeout_queue *wc_q,
+ *ka_q;
+};
+
#define ID_FROM_CHILD_THREAD(c, t) ((c * thread_limit) + t)
/* The event MPM respects a couple of runtime flags that can aid
#else
apr_socket_timeout_set(csd, 0);
#endif
+ cs->queue_timestamp = apr_time_now();
/*
* If some module requested a shortened waiting period, only wait for
* 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
* DoS attacks.
*/
if (apr_table_get(cs->c->notes, "short-lingering-close")) {
- cs->expiration_time =
- apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER);
- q = &short_linger_q;
+ q = short_linger_q;
cs->pub.state = CONN_STATE_LINGER_SHORT;
}
else {
- cs->expiration_time =
- apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER);
- q = &linger_q;
+ q = linger_q;
cs->pub.state = CONN_STATE_LINGER_NORMAL;
}
apr_atomic_inc32(&lingering_count);
cs->c->sbh = NULL;
}
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(*q, cs);
+ TO_QUEUE_APPEND(q, cs);
cs->pfd.reqevents = (
cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
"start_lingering_close: apr_pollset_add failure");
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_REMOVE(*q, cs);
+ TO_QUEUE_REMOVE(q, cs);
apr_thread_mutex_unlock(timeout_mutex);
apr_socket_close(cs->pfd.desc.s);
ap_push_pool(worker_queue_info, cs->p);
&mpm_event_module);
cs->r = r;
+ cs->sc = ap_get_module_config(ap_server_conf->module_config,
+ &mpm_event_module);
apr_pool_cleanup_register(r->pool, c, event_request_cleanup,
apr_pool_cleanup_null);
}
+/*
+ * event_post_read_request() tracks the current server config for a
+ * given request.
+ */
+static int event_post_read_request(request_rec *r)
+{
+ conn_rec *c = r->connection;
+ event_conn_state_t *cs = ap_get_module_config(c->conn_config,
+ &mpm_event_module);
+
+ /* To preserve legacy behaviour (consistent with other MPMs), use
+ * the keepalive timeout from the base server (first on this IP:port)
+ * when none is explicitly configured on this server.
+ */
+ if (r->server->keep_alive_timeout_set) {
+ cs->sc = ap_get_module_config(r->server->module_config,
+ &mpm_event_module);
+ }
+ else {
+ cs->sc = ap_get_module_config(c->base_server->module_config,
+ &mpm_event_module);
+ }
+ return OK;
+}
+
/*
* process one connection in the worker
*/
cs->c = c;
c->cs = &(cs->pub);
cs->p = p;
+ cs->sc = ap_get_module_config(ap_server_conf->module_config,
+ &mpm_event_module);
cs->pfd.desc_type = APR_POLL_SOCKET;
cs->pfd.reqevents = APR_POLLIN;
cs->pfd.desc.s = sock;
* Set a write timeout for this connection, and let the
* event thread poll for writeability.
*/
- cs->expiration_time = ap_server_conf->timeout + apr_time_now();
+ cs->queue_timestamp = apr_time_now();
notify_suspend(cs);
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(write_completion_q, cs);
+ TO_QUEUE_APPEND(cs->sc->wc_q, cs);
cs->pfd.reqevents = (
cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
start_lingering_close_blocking(cs);
}
else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
- /* It greatly simplifies the logic to use a single timeout value here
+ /* It greatly simplifies the logic to use a single timeout value per q
* because the new element can just be added to the end of the list and
* it will stay sorted in expiration time sequence. If brand new
* sockets are sent to the event thread for a readability check, this
* timeout today. With a normal client, the socket will be readable in
* a few milliseconds anyway.
*/
- cs->expiration_time = ap_server_conf->keep_alive_timeout +
- apr_time_now();
+ cs->queue_timestamp = apr_time_now();
notify_suspend(cs);
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(keepalive_q, cs);
+ TO_QUEUE_APPEND(cs->sc->ka_q, cs);
/* Add work to pollset. */
cs->pfd.reqevents = APR_POLLIN;
c->suspended_baton = NULL;
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(write_completion_q, cs);
+ TO_QUEUE_APPEND(cs->sc->wc_q, cs);
cs->pfd.reqevents = (
cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
listener_poll_type *pt;
int i = 0;
- TO_QUEUE_INIT(write_completion_q);
- TO_QUEUE_INIT(keepalive_q);
- TO_QUEUE_INIT(linger_q);
- TO_QUEUE_INIT(short_linger_q);
-
listener_pollfd = apr_palloc(p, sizeof(apr_pollfd_t) * num_listensocks);
for (lr = my_bucket->listeners; lr != NULL; lr = lr->next, i++) {
apr_pollfd_t *pfd;
apr_size_t nbytes;
apr_status_t rv;
struct timeout_queue *q;
- q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? &short_linger_q : &linger_q;
+ q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
/* socket is already in non-blocking state */
do {
rv = apr_socket_close(csd);
AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- TO_QUEUE_REMOVE(*q, cs);
+ TO_QUEUE_REMOVE(q, cs);
apr_thread_mutex_unlock(timeout_mutex);
TO_QUEUE_ELEM_INIT(cs);
apr_time_t timeout_time,
int (*func)(event_conn_state_t *))
{
- int count = 0;
+ int total = 0, count;
event_conn_state_t *first, *cs, *last;
+ struct timeout_head_t trash;
+ struct timeout_queue *qp;
apr_status_t rv;
- if (!q->count) {
+
+ if (!*q->total) {
return;
}
- AP_DEBUG_ASSERT(!APR_RING_EMPTY(&q->head, event_conn_state_t, timeout_list));
- cs = first = APR_RING_FIRST(&q->head);
- while (cs != APR_RING_SENTINEL(&q->head, event_conn_state_t, timeout_list)
- && cs->expiration_time < timeout_time) {
- last = cs;
- rv = apr_pollset_remove(event_pollset, &cs->pfd);
- if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473)
- "apr_pollset_remove failed");
+ APR_RING_INIT(&trash, event_conn_state_t, timeout_list);
+ for (qp = q; qp; qp = qp->next) {
+ count = 0;
+ cs = first = APR_RING_FIRST(&qp->head);
+ while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
+ timeout_list)
+ /* Trash the entry if:
+ * - no timeout_time was given (asked for all), or
+ * - it expired (according to the queue timeout), or
+ * - the system clock skewed in the past: no entry should be
+ * registered above the given timeout_time (~now) + the queue
+ * timeout, we won't keep any here (eg. for centuries).
+ * Stop otherwise, no following entry will match thanks to the
+ * single timeout per queue (entries are added to the end!).
+ * This allows maintenance in O(1).
+ */
+ && (!timeout_time
+ || cs->queue_timestamp + qp->timeout < timeout_time
+ || cs->queue_timestamp > timeout_time + qp->timeout)) {
+ last = cs;
+ rv = apr_pollset_remove(event_pollset, &cs->pfd);
+ if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473)
+ "apr_pollset_remove failed");
+ }
+ cs = APR_RING_NEXT(cs, timeout_list);
+ count++;
}
- cs = APR_RING_NEXT(cs, timeout_list);
- count++;
+ if (!count)
+ continue;
+
+ APR_RING_UNSPLICE(first, last, timeout_list);
+ APR_RING_SPLICE_TAIL(&trash, first, last, event_conn_state_t,
+ timeout_list);
+ qp->count -= count;
+ total += count;
}
- if (!count)
+ if (!total)
return;
- APR_RING_UNSPLICE(first, last, timeout_list);
- AP_DEBUG_ASSERT(q->count >= count);
- q->count -= count;
+ AP_DEBUG_ASSERT(*q->total >= total);
+ *q->total -= total;
apr_thread_mutex_unlock(timeout_mutex);
- while (count) {
+ first = APR_RING_FIRST(&trash);
+ do {
cs = APR_RING_NEXT(first, timeout_list);
TO_QUEUE_ELEM_INIT(first);
func(first);
first = cs;
- count--;
- }
+ } while (--total);
apr_thread_mutex_lock(timeout_mutex);
}
"keep-alive: %d lingering: %d suspended: %u)",
apr_atomic_read32(&connection_count),
apr_atomic_read32(&clogged_count),
- write_completion_q.count,
- keepalive_q.count,
+ *write_completion_q->total,
+ *keepalive_q->total,
apr_atomic_read32(&lingering_count),
apr_atomic_read32(&suspended_count));
apr_thread_mutex_unlock(timeout_mutex);
listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
/* one of the sockets is readable */
- struct timeout_queue *remove_from_q = &write_completion_q;
- int blocking = 1;
event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
+ struct timeout_queue *remove_from_q = cs->sc->wc_q;
+ int blocking = 1;
+
switch (cs->pub.state) {
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
- remove_from_q = &keepalive_q;
+ remove_from_q = cs->sc->ka_q;
/* don't wait for a worker for a keepalive request */
blocking = 0;
/* FALL THROUGH */
get_worker(&have_idle_worker, blocking,
&workers_were_busy);
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_REMOVE(*remove_from_q, cs);
+ TO_QUEUE_REMOVE(remove_from_q, cs);
rc = apr_pollset_remove(event_pollset, &cs->pfd);
+ apr_thread_mutex_unlock(timeout_mutex);
/*
* Some of the pollset backends, like KQueue or Epoll
if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
"pollset remove failed");
- apr_thread_mutex_unlock(timeout_mutex);
start_lingering_close_nonblocking(cs);
break;
}
- apr_thread_mutex_unlock(timeout_mutex);
TO_QUEUE_ELEM_INIT(cs);
/* If we didn't get a worker immediately for a keep-alive
* request, we close the connection, so that the client can
* r->request_time for new requests
*/
now = apr_time_now();
- /* we only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR) */
- if (now > timeout_time) {
+ /* We only do this once per 0.1s (TIMEOUT_FUDGE_FACTOR), or on a clock
+ * skew (if the system time is set back in the meantime, timeout_time
+ * will exceed now + TIMEOUT_FUDGE_FACTOR, can't happen otherwise).
+ */
+ if (now > timeout_time || now + TIMEOUT_FUDGE_FACTOR < timeout_time ) {
struct process_score *ps;
timeout_time = now + TIMEOUT_FUDGE_FACTOR;
/* If all workers are busy, we kill older keep-alive connections so that they
* may connect to another process.
*/
- if (workers_were_busy && keepalive_q.count) {
+ if (workers_were_busy && *keepalive_q->total) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
"All workers are busy, will close %d keep-alive "
"connections",
- keepalive_q.count);
- process_timeout_queue(&keepalive_q,
- timeout_time + ap_server_conf->keep_alive_timeout,
+ *keepalive_q->total);
+ process_timeout_queue(keepalive_q, 0,
start_lingering_close_nonblocking);
}
else {
- process_timeout_queue(&keepalive_q, timeout_time,
+ process_timeout_queue(keepalive_q, timeout_time,
start_lingering_close_nonblocking);
}
/* Step 2: write completion timeouts */
- process_timeout_queue(&write_completion_q, timeout_time,
+ process_timeout_queue(write_completion_q, timeout_time,
start_lingering_close_nonblocking);
/* Step 3: (normal) lingering close completion timeouts */
- process_timeout_queue(&linger_q, timeout_time, stop_lingering_close);
+ process_timeout_queue(linger_q, timeout_time, stop_lingering_close);
/* Step 4: (short) lingering close completion timeouts */
- process_timeout_queue(&short_linger_q, timeout_time, stop_lingering_close);
+ process_timeout_queue(short_linger_q, timeout_time, stop_lingering_close);
ps = ap_get_scoreboard_process(process_slot);
- ps->write_completion = write_completion_q.count;
- ps->keep_alive = keepalive_q.count;
+ ps->write_completion = *write_completion_q->total;
+ ps->keep_alive = *keepalive_q->total;
apr_thread_mutex_unlock(timeout_mutex);
ps->connections = apr_atomic_read32(&connection_count);
return OK;
}
+static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog,
+ apr_pool_t *ptemp, server_rec *s)
+{
+ struct {
+ struct timeout_queue *tail, *q;
+ apr_hash_t *hash;
+ } wc, ka;
+
+ /* Not needed in pre_config stage */
+ if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG) {
+ return OK;
+ }
+
+ wc.tail = ka.tail = NULL;
+ wc.hash = apr_hash_make(ptemp);
+ ka.hash = apr_hash_make(ptemp);
+
+ TO_QUEUE_INIT(linger_q, pconf,
+ apr_time_from_sec(MAX_SECS_TO_LINGER), NULL);
+ TO_QUEUE_INIT(short_linger_q, pconf,
+ apr_time_from_sec(SECONDS_TO_LINGER), NULL);
+
+ for (; s; s = s->next) {
+ event_srv_cfg *sc = apr_pcalloc(pconf, sizeof *sc);
+
+ ap_set_module_config(s->module_config, &mpm_event_module, sc);
+ if (!wc.tail) {
+ /* The main server uses the global queues */
+ TO_QUEUE_INIT(wc.q, pconf, s->timeout, NULL);
+ apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q);
+ wc.tail = write_completion_q = wc.q;
+
+ TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, NULL);
+ apr_hash_set(ka.hash, &s->keep_alive_timeout,
+ sizeof s->keep_alive_timeout, ka.q);
+ ka.tail = keepalive_q = ka.q;
+ }
+ else {
+ /* The vhosts use any existing queue with the same timeout,
+ * or their own queue(s) if there isn't */
+ wc.q = apr_hash_get(wc.hash, &s->timeout, sizeof s->timeout);
+ if (!wc.q) {
+ TO_QUEUE_INIT(wc.q, pconf, s->timeout, wc.tail);
+ apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q);
+ wc.tail = wc.tail->next = wc.q;
+ }
+
+ ka.q = apr_hash_get(ka.hash, &s->keep_alive_timeout,
+ sizeof s->keep_alive_timeout);
+ if (!ka.q) {
+ TO_QUEUE_INIT(ka.q, pconf, s->keep_alive_timeout, ka.tail);
+ apr_hash_set(ka.hash, &s->keep_alive_timeout,
+ sizeof s->keep_alive_timeout, ka.q);
+ ka.tail = ka.tail->next = ka.q;
+ }
+ }
+ sc->wc_q = wc.q;
+ sc->ka_q = ka.q;
+ }
+
+ return OK;
+}
+
static int event_check_config(apr_pool_t *p, apr_pool_t *plog,
apr_pool_t *ptemp, server_rec *s)
{
* to retrieve it, so register as REALLY_FIRST
*/
ap_hook_pre_config(event_pre_config, NULL, NULL, APR_HOOK_REALLY_FIRST);
+ ap_hook_post_config(event_post_config, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_check_config(event_check_config, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_mpm(event_run, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_mpm_query(event_query, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_mpm_unregister_socket_callback(event_unregister_socket_callback, NULL, NULL,
APR_HOOK_MIDDLE);
ap_hook_pre_read_request(event_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE);
+ ap_hook_post_read_request(event_post_read_request, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_mpm_get_name(event_get_name, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_mpm_resume_suspended(event_resume_suspended, NULL, NULL, APR_HOOK_MIDDLE);
}