#include <limits.h> /* for INT_MAX */
-#include "equeue.h"
-
#if HAVE_SERF
#include "mod_serf.h"
#include "serf.h"
static fd_queue_info_t *worker_queue_info;
static int mpm_state = AP_MPMQ_STARTING;
-typedef enum {
- TIMEOUT_WRITE_COMPLETION,
- TIMEOUT_KEEPALIVE,
- TIMEOUT_LINGER,
- TIMEOUT_SHORT_LINGER
-} timeout_type_e;
+static apr_thread_mutex_t *timeout_mutex;
struct event_conn_state_t {
/** APR_RING of expiration timeouts */
/** public parts of the connection state */
conn_state_t pub;
};
-
-typedef struct pollset_op_t {
- timeout_type_e timeout_type;
- event_conn_state_t *cs;
- const char *tag;
-} pollset_op_t;
-
-
APR_RING_HEAD(timeout_head_t, event_conn_state_t);
+
struct timeout_queue {
struct timeout_head_t head;
int count;
* perform a non-graceful (forced) shutdown of the server.
*/
static apr_socket_t **worker_sockets;
-static ap_equeue_t **worker_equeues;
static void disable_listensocks(int process_slot)
{
#endif
}
-static void process_pollop(pollset_op_t *op)
-{
- apr_status_t rv;
- event_conn_state_t *cs = op->cs;
-
- switch (op->timeout_type) {
- case TIMEOUT_WRITE_COMPLETION:
- TO_QUEUE_APPEND(write_completion_q, cs);
- break;
- case TIMEOUT_KEEPALIVE:
- TO_QUEUE_APPEND(keepalive_q, cs);
- break;
- case TIMEOUT_LINGER:
- TO_QUEUE_APPEND(linger_q, cs);
- break;
- case TIMEOUT_SHORT_LINGER:
- TO_QUEUE_APPEND(short_linger_q, cs);
- break;
- }
-
- rv = apr_pollset_add(event_pollset, &op->cs->pfd);
-
- if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00467)
- "%s: apr_pollset_add failure", op->tag);
- }
-}
-
/*
* close our side of the connection
* Pre-condition: cs is not in any timeout queue and not in the pollset,
* timeout_mutex is not locked
* return: 0 if connection is fully closed,
* 1 if connection is lingering
- * may be called by listener or by worker thread.
- * the eq may be null if called from the listener thread,
- * and the pollset operations are done directly by this function.
+ * may be called by listener or by worker thread
*/
-static int start_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
+static int start_lingering_close(event_conn_state_t *cs)
{
apr_status_t rv;
cs->c->sbh = NULL; /* prevent scoreboard updates from the listener
- * worker will loop around soon and set SERVER_READY
+ * worker will loop around and set SERVER_READY soon
*/
if (ap_start_lingering_close(cs->c)) {
}
else {
apr_socket_t *csd = ap_get_conn_socket(cs->c);
- pollset_op_t localv;
- pollset_op_t *v;
-
- if (eq) {
- v = ap_equeue_writer_value(eq);
- }
- else {
- v = &localv;
- }
+ struct timeout_queue *q;
#ifdef AP_DEBUG
{
if (apr_table_get(cs->c->notes, "short-lingering-close")) {
cs->expiration_time =
apr_time_now() + apr_time_from_sec(SECONDS_TO_LINGER);
- v->timeout_type = TIMEOUT_SHORT_LINGER;
- v->tag = "start_lingering_close(short)";
+ q = &short_linger_q;
cs->pub.state = CONN_STATE_LINGER_SHORT;
}
else {
cs->expiration_time =
apr_time_now() + apr_time_from_sec(MAX_SECS_TO_LINGER);
- v->timeout_type = TIMEOUT_LINGER;
- v->tag = "start_lingering_close(normal)";
+ q = &linger_q;
cs->pub.state = CONN_STATE_LINGER_NORMAL;
}
-
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_APPEND(*q, cs);
cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
- v->cs = cs;
- if (eq != NULL) {
- ap_equeue_writer_onward(eq);
- apr_pollset_wakeup(event_pollset);
- }
- else {
- process_pollop(v);
+ rv = apr_pollset_add(event_pollset, &cs->pfd);
+ apr_thread_mutex_unlock(timeout_mutex);
+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+ "start_lingering_close: apr_pollset_add failure");
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_REMOVE(*q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
+ apr_socket_close(cs->pfd.desc.s);
+ apr_pool_clear(cs->p);
+ ap_push_pool(worker_queue_info, cs->p);
+ return 0;
}
}
return 1;
* Pre-condition: cs is not in any timeout queue and not in the pollset
* return: irrelevant (need same prototype as start_lingering_close)
*/
-static int stop_lingering_close(event_conn_state_t *cs, ap_equeue_t *eq)
+static int stop_lingering_close(event_conn_state_t *cs)
{
apr_status_t rv;
apr_socket_t *csd = ap_get_conn_socket(cs->c);
* 0 if it is still open and waiting for some event
*/
static int process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock,
- event_conn_state_t * cs,
- ap_equeue_t *eq,
- int my_child_num,
+ event_conn_state_t * cs, int my_child_num,
int my_thread_num)
{
conn_rec *c;
pt->type = PT_CSD;
pt->baton = cs;
cs->pfd.client_data = pt;
+ TO_QUEUE_ELEM_INIT(cs);
ap_update_vhost_given_ip(c);
* Set a write timeout for this connection, and let the
* event thread poll for writeability.
*/
- pollset_op_t *v = ap_equeue_writer_value(eq);
-
cs->expiration_time = ap_server_conf->timeout + apr_time_now();
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_APPEND(write_completion_q, cs);
cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
-
- v->cs = cs;
- v->timeout_type = TIMEOUT_WRITE_COMPLETION;
- v->tag = "process_socket(write_completion)";
-
- ap_equeue_writer_onward(eq);
- apr_pollset_wakeup(event_pollset);
+ rc = apr_pollset_add(event_pollset, &cs->pfd);
+ apr_thread_mutex_unlock(timeout_mutex);
return 1;
}
else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
}
if (cs->pub.state == CONN_STATE_LINGER) {
- if (!start_lingering_close(cs, eq)) {
+ if (!start_lingering_close(cs))
return 0;
- }
}
else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
- pollset_op_t *v;
+ apr_status_t rc;
/* It greatly simplifies the logic to use a single timeout value here
* because the new element can just be added to the end of the list and
*/
cs->expiration_time = ap_server_conf->keep_alive_timeout +
apr_time_now();
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_APPEND(keepalive_q, cs);
/* Add work to pollset. */
- v = ap_equeue_writer_value(eq);
- v->timeout_type = TIMEOUT_KEEPALIVE;
- v->cs = cs;
cs->pfd.reqevents = APR_POLLIN;
- v->tag = "process_socket(keepalive)";
- ap_equeue_writer_onward(eq);
- apr_pollset_wakeup(event_pollset);
+ rc = apr_pollset_add(event_pollset, &cs->pfd);
+ apr_thread_mutex_unlock(timeout_mutex);
+
+ if (rc != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
+ "process_socket: apr_pollset_add failure");
+ AP_DEBUG_ASSERT(rc == APR_SUCCESS);
+ }
}
return 1;
}
return;
}
+ apr_thread_mutex_lock(timeout_mutex);
rv = apr_pollset_remove(event_pollset, pfd);
AP_DEBUG_ASSERT(rv == APR_SUCCESS);
AP_DEBUG_ASSERT(rv == APR_SUCCESS);
TO_QUEUE_REMOVE(*q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
TO_QUEUE_ELEM_INIT(cs);
apr_pool_clear(cs->p);
*/
static void process_timeout_queue(struct timeout_queue *q,
apr_time_t timeout_time,
- int (*func)(event_conn_state_t *, ap_equeue_t *eq))
+ int (*func)(event_conn_state_t *))
{
int count = 0;
event_conn_state_t *first, *cs, *last;
APR_RING_UNSPLICE(first, last, timeout_list);
AP_DEBUG_ASSERT(q->count >= count);
q->count -= count;
+ apr_thread_mutex_unlock(timeout_mutex);
while (count) {
cs = APR_RING_NEXT(first, timeout_list);
TO_QUEUE_ELEM_INIT(first);
- func(first, NULL);
+ func(first);
first = cs;
count--;
}
+ apr_thread_mutex_lock(timeout_mutex);
}
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
/* trace log status every second */
if (now - last_log > apr_time_from_msec(1000)) {
last_log = now;
+ apr_thread_mutex_lock(timeout_mutex);
ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
"connections: %d (write-completion: %d "
"keep-alive: %d lingering: %d)",
connection_count, write_completion_q.count,
keepalive_q.count,
linger_q.count + short_linger_q.count);
+ apr_thread_mutex_unlock(timeout_mutex);
}
}
}
#endif
rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
- if (rc != APR_SUCCESS
- && !APR_STATUS_IS_EINTR(rc)
- && !APR_STATUS_IS_TIMEUP(rc)) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
- "apr_pollset_poll failed. Attempting to "
- "shutdown process gracefully");
- signal_threads(ST_GRACEFUL);
+ if (rc != APR_SUCCESS) {
+ if (APR_STATUS_IS_EINTR(rc)) {
+ continue;
+ }
+ if (!APR_STATUS_IS_TIMEUP(rc)) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
+ "apr_pollset_poll failed. Attempting to "
+ "shutdown process gracefully");
+ signal_threads(ST_GRACEFUL);
+ }
}
if (listener_may_exit) {
/* one of the sockets is readable */
struct timeout_queue *remove_from_q = &write_completion_q;
int blocking = 1;
- cs = (event_conn_state_t *)pt->baton;
+ cs = (event_conn_state_t *) pt->baton;
switch (cs->pub.state) {
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
case CONN_STATE_WRITE_COMPLETION:
get_worker(&have_idle_worker, blocking,
&workers_were_busy);
+ apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_REMOVE(*remove_from_q, cs);
rc = apr_pollset_remove(event_pollset, &cs->pfd);
if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
"pollset remove failed");
- start_lingering_close(cs, NULL);
+ apr_thread_mutex_unlock(timeout_mutex);
+ start_lingering_close(cs);
break;
}
+ apr_thread_mutex_unlock(timeout_mutex);
TO_QUEUE_ELEM_INIT(cs);
/* If we didn't get a worker immediately for a keep-alive
* request, we close the connection, so that the client can
* re-connect to a different process.
*/
if (!have_idle_worker) {
- start_lingering_close(cs, NULL);
+ start_lingering_close(cs);
break;
}
rc = push2worker(out_pfd, event_pollset);
}
}
else if (pt->type == PT_ACCEPT) {
- int skip_accept = 0;
- int connection_count_local = connection_count;
-
/* A Listener Socket is ready for an accept() */
if (workers_were_busy) {
- skip_accept = 1;
+ if (!listeners_disabled)
+ disable_listensocks(process_slot);
+ listeners_disabled = 1;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
"All workers busy, not accepting new conns"
"in this process");
}
- else if (listeners_disabled) {
- listeners_disabled = 0;
- enable_listensocks(process_slot);
- }
- else if (connection_count_local > threads_per_child
+ else if (apr_atomic_read32(&connection_count) > threads_per_child
+ ap_queue_info_get_idlers(worker_queue_info) *
worker_factor / WORKER_FACTOR_SCALE)
{
- skip_accept = 1;
+ if (!listeners_disabled)
+ disable_listensocks(process_slot);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
"Too many open connections (%u), "
"not accepting new conns in this process",
- connection_count_local);
+ apr_atomic_read32(&connection_count));
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
"Idle workers: %u",
ap_queue_info_get_idlers(worker_queue_info));
+ listeners_disabled = 1;
}
-
- if (skip_accept == 0) {
+ else if (listeners_disabled) {
+ listeners_disabled = 0;
+ enable_listensocks(process_slot);
+ }
+ if (!listeners_disabled) {
lr = (ap_listen_rec *) pt->baton;
ap_pop_pool(&ptrans, worker_queue_info);
num--;
} /* while for processing poll */
- {
- /* TODO: break out to separate function */
- int i;
-
- for (i = 0; i < threads_per_child; i++) {
- ap_equeue_t *eq = worker_equeues[i];
- pollset_op_t *op = NULL;
-
- while ((op = ap_equeue_reader_next(eq)) != NULL) {
- process_pollop(op);
- }
- }
- }
-
/* XXX possible optimization: stash the current time for use as
* r->request_time for new requests
*/
timeout_time = now + TIMEOUT_FUDGE_FACTOR;
/* handle timed out sockets */
+ apr_thread_mutex_lock(timeout_mutex);
/* Step 1: keepalive timeouts */
/* If all workers are busy, we kill older keep-alive connections so that they
ps->write_completion = write_completion_q.count;
ps->lingering_close = linger_q.count + short_linger_q.count;
ps->keep_alive = keepalive_q.count;
+ apr_thread_mutex_unlock(timeout_mutex);
ps->connections = apr_atomic_read32(&connection_count);
/* XXX: should count CONN_STATE_SUSPENDED and set ps->suspended */
apr_status_t rv;
int is_idle = 0;
timer_event_t *te = NULL;
- ap_equeue_t *eq = worker_equeues[thread_slot];
free(ti);
else {
is_idle = 0;
worker_sockets[thread_slot] = csd;
- rv = process_socket(thd, ptrans, csd, cs, eq, process_slot, thread_slot);
+ rv = process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
if (!rv) {
requests_this_child--;
}
clean_child_exit(APEXIT_CHILDFATAL);
}
+ /* Create the timeout mutex and main pollset before the listener
+ * thread starts.
+ */
+ rv = apr_thread_mutex_create(&timeout_mutex, APR_THREAD_MUTEX_DEFAULT,
+ pchild);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+ "creation of the timeout mutex failed.");
+ clean_child_exit(APEXIT_CHILDFATAL);
+ }
+
/* Create the main pollset */
rv = apr_pollset_create(&event_pollset,
threads_per_child, /* XXX don't we need more, to handle
* connections in K-A or lingering
* close?
*/
- pchild, APR_POLLSET_WAKEABLE|APR_POLLSET_NOCOPY);
+ pchild, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
- "apr_pollset_create failed; check system or user limits");
+ "apr_pollset_create with Thread Safety failed.");
clean_child_exit(APEXIT_CHILDFATAL);
}
worker_sockets = apr_pcalloc(pchild, threads_per_child
* sizeof(apr_socket_t *));
- worker_equeues = apr_palloc(pchild, threads_per_child * sizeof(ap_equeue_t*));
-
- for (i = 0; i < threads_per_child; i++) {
- ap_equeue_t* eq = NULL;
- /* TODO: research/test optimal size of queue here */
- ap_equeue_create(pchild, 16, sizeof(pollset_op_t), &eq);
- /* same as thread ID */
- worker_equeues[i] = eq;
- }
-
loops = prev_threads_created = 0;
while (1) {
/* threads_per_child does not include the listener thread */
++retained->module_loads;
if (retained->module_loads == 2) {
rv = apr_pollset_create(&event_pollset, 1, plog,
- APR_POLLSET_WAKEABLE|APR_POLLSET_NOCOPY);
+ APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO(00495)
- "apr_pollset_create failed; check system or user limits");
+ "Couldn't create a Thread Safe Pollset. "
+ "Is it supported on your platform?"
+ "Also check system or user limits!");
return HTTP_INTERNAL_SERVER_ERROR;
}
apr_pollset_destroy(event_pollset);