static apr_status_t motorz_io_process(motorz_conn_t *scon);
static void clean_child_exit(int code) __attribute__ ((noreturn));
+static apr_pollset_t *motorz_pollset;
+static apr_skiplist *motorz_timer_ring;
+
static motorz_core_t *motorz_core_get()
{
return g_motorz_core;
{
motorz_timer_t *elem = (motorz_timer_t *)baton;
motorz_core_t *mz = elem->mz;
-
+
apr_thread_mutex_lock(mz->mtx);
apr_skiplist_remove(mz->timer_ring, elem, NULL);
apr_thread_mutex_unlock(mz->mtx);
-
+
return APR_SUCCESS;
}
motorz_conn_t *mzon = (motorz_conn_t *) baton;
/* pqXXXXX: handle timeouts. */
conn_rec *c = scon->c;
-
+
cs = NULL;
#endif
-
+
ap_log_error(APLOG_MARK, APLOG_WARNING, 0, ap_server_conf, APLOGNO(00247)
"io timeout hit (?)");
}
long conn_id = 0;
motorz_sb_t *sb;
motorz_conn_t *scon = (motorz_conn_t *) baton;
-
+
ap_create_sb_handle(&sbh, scon->pool, 0, 0);
scon->sbh = sbh;
scon->ba = apr_bucket_alloc_create(scon->pool);
-
+
scon->c = ap_run_create_connection(scon->pool, ap_server_conf, scon->sock,
conn_id, sbh, scon->ba);
/* XXX: handle failure */
-
+
scon->c->cs = &scon->cs;
sb = apr_pcalloc(scon->pool, sizeof(motorz_sb_t));
-
+
scon->c->current_thread = thread;
-
+
scon->pfd.p = scon->pool;
scon->pfd.desc_type = APR_POLL_SOCKET;
scon->pfd.desc.s = scon->sock;
scon->pfd.reqevents = APR_POLLIN;
-
+
sb->type = PT_CSD;
sb->baton = scon;
scon->pfd.client_data = sb;
-
+
ap_update_vhost_given_ip(scon->c);
-
+
status = ap_run_pre_connection(scon->c, scon->sock);
if (status != OK && status != DONE) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO()
"motorz_io_setup_conn: connection aborted");
scon->c->aborted = 1;
}
-
+
scon->cs.state = CONN_STATE_READ_REQUEST_LINE;
scon->cs.sense = CONN_SENSE_DEFAULT;
-
+
status = motorz_io_process(scon);
-
+
if (status) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, status, ap_server_conf, APLOGNO()
"motorz_io_setup_conn: motorz_io_process failed (?)");
apr_pool_t *ptrans;
apr_socket_t *socket;
ap_listen_rec *lr = (ap_listen_rec *) sb->baton;
-
+
apr_pool_create(&ptrans, NULL);
-
+
apr_pool_tag(ptrans, "transaction");
-
+
rv = lr->accept_func((void *)&socket, lr, ptrans);
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO()
scon->pool = ptrans;
scon->sock = socket;
scon->mz = mz;
-
+
return apr_thread_pool_push(mz->workers,
motorz_io_setup_conn,
scon,
APR_THREAD_TASK_PRIORITY_NORMAL, NULL);
}
-
+
return APR_SUCCESS;
}
static void motorz_timer_run(motorz_timer_t *ep)
{
apr_pool_cleanup_kill(ep->pool, ep, motorz_timer_pool_cleanup);
-
+
ep->cb(ep->mz, ep->baton);
}
static void *motorz_timer_invoke(apr_thread_t *thread, void *baton)
{
motorz_timer_t *ep = (motorz_timer_t *)baton;
-
+
motorz_timer_run(ep);
return NULL;
}
motorz_sb_t *sb = (motorz_sb_t *) baton;
motorz_conn_t *scon = (motorz_conn_t *) sb->baton;
apr_status_t rv;
-
+
scon->c->current_thread = thread;
-
+
rv = motorz_io_process(scon);
-
+
if (rv) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO()
"motorz_io_invoke: motorz_io_process failed (?)");
sb, APR_THREAD_TASK_PRIORITY_NORMAL, NULL);
}
-static apr_status_t motorz_io_callback(void *baton, apr_pollfd_t *pfd)
+static apr_status_t motorz_io_callback(void *baton, const apr_pollfd_t *pfd)
{
apr_status_t status = APR_SUCCESS;
motorz_core_t *mz = (motorz_core_t *) baton;
motorz_sb_t *sb = pfd->client_data;
-
-
+
+
if (sb->type == PT_ACCEPT) {
status = motorz_io_accept(mz, sb);
}
{
motorz_timer_t *elem = NULL;
apr_time_t t = apr_time_now() + relative_time;
-
+
apr_thread_mutex_lock(mz->mtx);
-
+
elem = (motorz_timer_t *) apr_pcalloc(shutdown_pool, sizeof(motorz_timer_t));
-
+
elem->expires = t;
elem->cb = cb;
elem->baton = baton;
apr_status_t rv;
motorz_core_t *mz;
conn_rec *c;
-
+
if (scon->c->clogging_input_filters && !scon->c->aborted) {
/* Since we have an input filter which 'clogs' the input stream,
* like mod_ssl used to, lets just do the normal read from input
scon->cs.state = CONN_STATE_LINGER;
}
}
-
+
mz = scon->mz;
c = scon->c;
-
+
while (!c->aborted) {
-
+
if (scon->pfd.reqevents != 0) {
/*
* Some of the pollset backends, like KQueue or Epoll
* therefore, we can accept _SUCCESS or _NOTFOUND,
* and we still want to keep going
*/
- rv = apr_pollcb_remove(mz->pollcb, &scon->pfd);
+ rv = apr_pollset_remove(mz->pollset, &scon->pfd);
if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO()
- "motorz_io_process: apr_pollcb_remove failure");
+ "motorz_io_process: apr_pollset_remove failure");
/*AP_DEBUG_ASSERT(rv == APR_SUCCESS);*/
}
scon->pfd.reqevents = 0;
}
-
+
if (scon->cs.state == CONN_STATE_READ_REQUEST_LINE) {
if (!c->aborted) {
ap_run_process_connection(c);
scon->cs.state = CONN_STATE_LINGER;
}
}
-
+
if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) {
ap_filter_t *output_filter = c->output_filters;
ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c);
while (output_filter->next != NULL) {
output_filter = output_filter->next;
}
-
+
rv = output_filter->frec->filter_func.out_func(output_filter,
NULL);
-
+
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO()
"network write failure in core output filter");
* Set a write timeout for this connection, and let the
* event thread poll for writeability.
*/
-
+
motorz_register_timer(scon->mz,
motorz_io_timeout_cb,
scon,
NULL ? scon->c->base_server->
timeout : ap_server_conf->timeout,
scon->pool);
-
+
scon->pfd.reqevents = (
scon->cs.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
scon->cs.sense = CONN_SENSE_DEFAULT;
-
- rv = apr_pollcb_add(mz->pollcb, &scon->pfd);
-
+
+ rv = apr_pollset_add(mz->pollset, &scon->pfd);
+
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
ap_server_conf, APLOGNO()
- "apr_pollcb_add: failed in write completion");
+ "apr_pollset_add: failed in write completion");
}
return APR_SUCCESS;
}
scon->cs.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
}
}
-
+
if (scon->cs.state == CONN_STATE_LINGER) {
ap_lingering_close(c);
apr_pool_destroy(scon->pool);
return APR_SUCCESS;
}
-
+
if (scon->cs.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
motorz_register_timer(scon->mz,
motorz_io_timeout_cb,
NULL ? scon->c->base_server->
timeout : ap_server_conf->timeout,
scon->pool);
-
+
scon->pfd.reqevents = (
scon->cs.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
scon->cs.sense = CONN_SENSE_DEFAULT;
-
- rv = apr_pollcb_add(mz->pollcb, &scon->pfd);
-
+
+ rv = apr_pollset_add(mz->pollset, &scon->pfd);
+
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO()
- "process_socket: apr_pollcb_add failure in read request line");
+ "process_socket: apr_pollset_add failure in read request line");
}
-
+
return APR_SUCCESS;
}
}
-
+
ap_lingering_close(c);
apr_pool_destroy(scon->pool);
return APR_SUCCESS;
}
+static apr_status_t motorz_pollset_cb(motorz_core_t *mz, apr_interval_time_t timeout)
+{
+ apr_status_t rc;
+ const apr_pollfd_t *out_pfd;
+ apr_int32_t num = 0;
+
+ rc = apr_pollset_poll(mz->pollset, timeout, &num, &out_pfd);
+ if (rc != APR_SUCCESS) {
+ if (APR_STATUS_IS_EINTR(rc) || APR_STATUS_IS_TIMEUP(rc)) {
+ return APR_SUCCESS;
+ } else {
+ return rc;
+ }
+ }
+ while (num) {
+ /* TODO: Error check */
+ motorz_io_callback(mz, out_pfd);
+ out_pfd++;
+ num--;
+ }
+ return APR_SUCCESS;
+}
+
/**
* Create worker thread pool.
*/
static apr_status_t motorz_setup_workers(motorz_core_t *mz)
{
apr_status_t rv;
-
+
rv = apr_thread_pool_create(&mz->workers,
threads_per_child,
threads_per_child, mz->pool);
-
+
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO()
"motorz_setup_workers: apr_thread_pool_create with %d threads failed",
threads_per_child);
return rv;
}
-
+
return APR_SUCCESS;
}
-static int motorz_setup_pollcb(motorz_core_t *mz)
+static int motorz_setup_pollset(motorz_core_t *mz)
{
int i;
apr_status_t rv;
int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL};
char *methods[] = {"kqueue", "port", "epoll"};
-
+
for (i = 0; i < sizeof(good_methods) / sizeof(void*); i++) {
- rv = apr_pollcb_create_ex(&mz->pollcb,
+ rv = apr_pollset_create_ex(&mz->pollset,
512,
mz->pool,
APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT,
good_methods[i]);
if (rv == APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO()
- "motorz_setup_pollcb: apr_pollcb_create_ex using %s", methods[i]);
+ "motorz_setup_pollset: apr_pollset_create_ex using %s", methods[i]);
break;
}
}
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_INFO, rv, ap_server_conf, APLOGNO()
- "motorz_setup_pollcb: apr_pollcb_create_ex failed for all possible backends!");
- rv = apr_pollcb_create(&mz->pollcb,
+ "motorz_setup_pollset: apr_pollset_create_ex failed for all possible backends!");
+ rv = apr_pollset_create(&mz->pollset,
512,
mz->pool,
APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
}
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, APLOGNO()
- "motorz_setup_pollcb: apr_pollcb_create failed for all possible backends!");
+ "motorz_setup_pollset: apr_pollset_create failed for all possible backends!");
}
return rv;
}
(void) ap_update_child_status(sbh, SERVER_READY, (request_rec *) NULL);
+#if 0
apr_skiplist_init(&mz->timer_ring, mz->pool);
apr_skiplist_set_compare(mz->timer_ring, indexing_comp, indexing_compk);
-
+#endif
status = motorz_setup_workers(mz);
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_CRIT, status, ap_server_conf, APLOGNO()
clean_child_exit(APEXIT_CHILDSICK);
}
- status = motorz_setup_pollcb(mz);
+ status = motorz_setup_pollset(mz);
if (status != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO()
- "Couldn't setup pollcb in child; check system or user limits");
+ "Couldn't setup pollset in child; check system or user limits");
clean_child_exit(APEXIT_CHILDSICK); /* assume temporary resource issue */
}
clean_child_exit(0);
}
- status = apr_pollcb_add(mz->pollcb, pfd);
+ status = apr_pollset_add(mz->pollset, pfd);
if (status != APR_SUCCESS) {
/* If the child processed a SIGWINCH before setting up the
* pollset, this error path is expected and harmless,
* pollute the logs in that case. */
if (!die_now) {
ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO()
- "Couldn't add listener to pollcb; check system or user limits");
+ "Couldn't add listener to pollset; check system or user limits");
clean_child_exit(APEXIT_CHILDSICK);
}
clean_child_exit(0);
apr_time_t tnow = apr_time_now();
motorz_timer_t *te;
apr_interval_time_t timeout = apr_time_from_msec(500);
-
+
apr_thread_mutex_lock(mz->mtx);
te = apr_skiplist_peek(mz->timer_ring);
-
+
if (te) {
if (tnow < te->expires) {
timeout = (te->expires - tnow);
}
}
apr_thread_mutex_unlock(mz->mtx);
-
- status = apr_pollcb_poll(mz->pollcb, timeout, motorz_io_callback, mz);
-
+
+ status = motorz_pollset_cb(mz, timeout);
+
tnow = apr_time_now();
-
- if (status) {
+
+ if (status != APR_SUCCESS) {
if (!APR_STATUS_IS_EINTR(status) && !APR_STATUS_IS_TIMEUP(status)) {
ap_log_error(APLOG_MARK, APLOG_CRIT, status, NULL,
"motorz_main_loop: apr_pollcb_poll failed");
clean_child_exit(0);
}
}
-
+
apr_thread_mutex_lock(mz->mtx);
-
+
/* now iterate any timers and push to worker pool */
while (te) {
if (te->expires < tnow) {
}
te = apr_skiplist_peek(mz->timer_ring);
}
-
+
apr_thread_mutex_unlock(mz->mtx);
}
if (ap_mpm_pod_check(my_bucket->pod) == APR_SUCCESS) { /* selected as idle? */
clean_child_exit(0);
}
-
static int make_child(motorz_core_t *mz, server_rec *s, int slot, int bucket)
{
int pid;
child_slot = ap_find_child_by_pid(&pid);
if (processed_status == APEXIT_CHILDFATAL) {
/* fix race condition found in PR 39311
- * A child created at the same time as a graceful happens
+ * A child created at the same time as a graceful happens
* can find the lock missing and create a fatal error.
* It is not fatal for the last generation to be in this state.
*/
if (!g_motorz_core) {
mz = g_motorz_core = ap_retained_data_create(userdata_key, sizeof(*g_motorz_core));
mz->max_daemons_limit = -1;
+ mz->timer_ring = motorz_timer_ring;
+ mz->pollset = motorz_pollset;
}
++mz->module_loads;
if (mz->module_loads == 2) {
if (err != NULL) {
return err;
}
-
+
threads_per_child = atoi(arg);
return NULL;
}