From 627711af7f1183897a063e3306d3be60db5bd0a2 Mon Sep 17 00:00:00 2001 From: Jim Jagielski Date: Mon, 9 Mar 2015 18:26:13 +0000 Subject: [PATCH] use pollset impl instead of pollcb git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1665317 13f79535-47bb-0310-9956-ffa450edef68 --- server/mpm/motorz/motorz.c | 196 +++++++++++++++++++++---------------- server/mpm/motorz/motorz.h | 7 +- 2 files changed, 118 insertions(+), 85 deletions(-) diff --git a/server/mpm/motorz/motorz.c b/server/mpm/motorz/motorz.c index 087f30a824..89f8c37a49 100644 --- a/server/mpm/motorz/motorz.c +++ b/server/mpm/motorz/motorz.c @@ -56,6 +56,9 @@ static void clean_child_exit(int code) __attribute__ ((noreturn)); static apr_status_t motorz_io_process(motorz_conn_t *scon); static void clean_child_exit(int code) __attribute__ ((noreturn)); +static apr_pollset_t *motorz_pollset; +static apr_skiplist *motorz_timer_ring; + static motorz_core_t *motorz_core_get() { return g_motorz_core; @@ -82,11 +85,11 @@ static apr_status_t motorz_timer_pool_cleanup(void *baton) { motorz_timer_t *elem = (motorz_timer_t *)baton; motorz_core_t *mz = elem->mz; - + apr_thread_mutex_lock(mz->mtx); apr_skiplist_remove(mz->timer_ring, elem, NULL); apr_thread_mutex_unlock(mz->mtx); - + return APR_SUCCESS; } @@ -97,10 +100,10 @@ static void motorz_io_timeout_cb(motorz_core_t * sc, void *baton) motorz_conn_t *mzon = (motorz_conn_t *) baton; /* pqXXXXX: handle timeouts. */ conn_rec *c = scon->c; - + cs = NULL; #endif - + ap_log_error(APLOG_MARK, APLOG_WARNING, 0, ap_server_conf, APLOGNO(00247) "io timeout hit (?)"); } @@ -112,43 +115,43 @@ static void *motorz_io_setup_conn(apr_thread_t * thread, void *baton) long conn_id = 0; motorz_sb_t *sb; motorz_conn_t *scon = (motorz_conn_t *) baton; - + ap_create_sb_handle(&sbh, scon->pool, 0, 0); scon->sbh = sbh; scon->ba = apr_bucket_alloc_create(scon->pool); - + scon->c = ap_run_create_connection(scon->pool, ap_server_conf, scon->sock, conn_id, sbh, scon->ba); /* XXX: handle failure */ - + scon->c->cs = &scon->cs; sb = apr_pcalloc(scon->pool, sizeof(motorz_sb_t)); - + scon->c->current_thread = thread; - + scon->pfd.p = scon->pool; scon->pfd.desc_type = APR_POLL_SOCKET; scon->pfd.desc.s = scon->sock; scon->pfd.reqevents = APR_POLLIN; - + sb->type = PT_CSD; sb->baton = scon; scon->pfd.client_data = sb; - + ap_update_vhost_given_ip(scon->c); - + status = ap_run_pre_connection(scon->c, scon->sock); if (status != OK && status != DONE) { ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO() "motorz_io_setup_conn: connection aborted"); scon->c->aborted = 1; } - + scon->cs.state = CONN_STATE_READ_REQUEST_LINE; scon->cs.sense = CONN_SENSE_DEFAULT; - + status = motorz_io_process(scon); - + if (status) { ap_log_error(APLOG_MARK, APLOG_DEBUG, status, ap_server_conf, APLOGNO() "motorz_io_setup_conn: motorz_io_process failed (?)"); @@ -168,11 +171,11 @@ static apr_status_t motorz_io_accept(motorz_core_t *mz, motorz_sb_t *sb) apr_pool_t *ptrans; apr_socket_t *socket; ap_listen_rec *lr = (ap_listen_rec *) sb->baton; - + apr_pool_create(&ptrans, NULL); - + apr_pool_tag(ptrans, "transaction"); - + rv = lr->accept_func((void *)&socket, lr, ptrans); if (rv) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO() @@ -184,27 +187,27 @@ static apr_status_t motorz_io_accept(motorz_core_t *mz, motorz_sb_t *sb) scon->pool = ptrans; scon->sock = socket; scon->mz = mz; - + return apr_thread_pool_push(mz->workers, motorz_io_setup_conn, scon, APR_THREAD_TASK_PRIORITY_NORMAL, NULL); } - + return APR_SUCCESS; } static void motorz_timer_run(motorz_timer_t *ep) { apr_pool_cleanup_kill(ep->pool, ep, motorz_timer_pool_cleanup); - + ep->cb(ep->mz, ep->baton); } static void *motorz_timer_invoke(apr_thread_t *thread, void *baton) { motorz_timer_t *ep = (motorz_timer_t *)baton; - + motorz_timer_run(ep); return NULL; } @@ -214,11 +217,11 @@ static void *motorz_io_invoke(apr_thread_t * thread, void *baton) motorz_sb_t *sb = (motorz_sb_t *) baton; motorz_conn_t *scon = (motorz_conn_t *) sb->baton; apr_status_t rv; - + scon->c->current_thread = thread; - + rv = motorz_io_process(scon); - + if (rv) { ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO() "motorz_io_invoke: motorz_io_process failed (?)"); @@ -233,13 +236,13 @@ static apr_status_t motorz_io_event_process(motorz_core_t *mz, motorz_sb_t *sb) sb, APR_THREAD_TASK_PRIORITY_NORMAL, NULL); } -static apr_status_t motorz_io_callback(void *baton, apr_pollfd_t *pfd) +static apr_status_t motorz_io_callback(void *baton, const apr_pollfd_t *pfd) { apr_status_t status = APR_SUCCESS; motorz_core_t *mz = (motorz_core_t *) baton; motorz_sb_t *sb = pfd->client_data; - - + + if (sb->type == PT_ACCEPT) { status = motorz_io_accept(mz, sb); } @@ -259,11 +262,11 @@ static void motorz_register_timer(motorz_core_t *mz, { motorz_timer_t *elem = NULL; apr_time_t t = apr_time_now() + relative_time; - + apr_thread_mutex_lock(mz->mtx); - + elem = (motorz_timer_t *) apr_pcalloc(shutdown_pool, sizeof(motorz_timer_t)); - + elem->expires = t; elem->cb = cb; elem->baton = baton; @@ -279,7 +282,7 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) apr_status_t rv; motorz_core_t *mz; conn_rec *c; - + if (scon->c->clogging_input_filters && !scon->c->aborted) { /* Since we have an input filter which 'clogs' the input stream, * like mod_ssl used to, lets just do the normal read from input @@ -292,12 +295,12 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) scon->cs.state = CONN_STATE_LINGER; } } - + mz = scon->mz; c = scon->c; - + while (!c->aborted) { - + if (scon->pfd.reqevents != 0) { /* * Some of the pollset backends, like KQueue or Epoll @@ -305,15 +308,15 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) * therefore, we can accept _SUCCESS or _NOTFOUND, * and we still want to keep going */ - rv = apr_pollcb_remove(mz->pollcb, &scon->pfd); + rv = apr_pollset_remove(mz->pollset, &scon->pfd); if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO() - "motorz_io_process: apr_pollcb_remove failure"); + "motorz_io_process: apr_pollset_remove failure"); /*AP_DEBUG_ASSERT(rv == APR_SUCCESS);*/ } scon->pfd.reqevents = 0; } - + if (scon->cs.state == CONN_STATE_READ_REQUEST_LINE) { if (!c->aborted) { ap_run_process_connection(c); @@ -326,17 +329,17 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) scon->cs.state = CONN_STATE_LINGER; } } - + if (scon->cs.state == CONN_STATE_WRITE_COMPLETION) { ap_filter_t *output_filter = c->output_filters; ap_update_child_status_from_conn(scon->sbh, SERVER_BUSY_WRITE, c); while (output_filter->next != NULL) { output_filter = output_filter->next; } - + rv = output_filter->frec->filter_func.out_func(output_filter, NULL); - + if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO() "network write failure in core output filter"); @@ -347,7 +350,7 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) * Set a write timeout for this connection, and let the * event thread poll for writeability. */ - + motorz_register_timer(scon->mz, motorz_io_timeout_cb, scon, @@ -355,18 +358,18 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) NULL ? scon->c->base_server-> timeout : ap_server_conf->timeout, scon->pool); - + scon->pfd.reqevents = ( scon->cs.sense == CONN_SENSE_WANT_READ ? APR_POLLIN : APR_POLLOUT) | APR_POLLHUP | APR_POLLERR; scon->cs.sense = CONN_SENSE_DEFAULT; - - rv = apr_pollcb_add(mz->pollcb, &scon->pfd); - + + rv = apr_pollset_add(mz->pollset, &scon->pfd); + if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf, APLOGNO() - "apr_pollcb_add: failed in write completion"); + "apr_pollset_add: failed in write completion"); } return APR_SUCCESS; } @@ -380,13 +383,13 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) scon->cs.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE; } } - + if (scon->cs.state == CONN_STATE_LINGER) { ap_lingering_close(c); apr_pool_destroy(scon->pool); return APR_SUCCESS; } - + if (scon->cs.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { motorz_register_timer(scon->mz, motorz_io_timeout_cb, @@ -395,80 +398,103 @@ static apr_status_t motorz_io_process(motorz_conn_t *scon) NULL ? scon->c->base_server-> timeout : ap_server_conf->timeout, scon->pool); - + scon->pfd.reqevents = ( scon->cs.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT : APR_POLLIN) | APR_POLLHUP | APR_POLLERR; scon->cs.sense = CONN_SENSE_DEFAULT; - - rv = apr_pollcb_add(mz->pollcb, &scon->pfd); - + + rv = apr_pollset_add(mz->pollset, &scon->pfd); + if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO() - "process_socket: apr_pollcb_add failure in read request line"); + "process_socket: apr_pollset_add failure in read request line"); } - + return APR_SUCCESS; } } - + ap_lingering_close(c); apr_pool_destroy(scon->pool); return APR_SUCCESS; } +static apr_status_t motorz_pollset_cb(motorz_core_t *mz, apr_interval_time_t timeout) +{ + apr_status_t rc; + const apr_pollfd_t *out_pfd; + apr_int32_t num = 0; + + rc = apr_pollset_poll(mz->pollset, timeout, &num, &out_pfd); + if (rc != APR_SUCCESS) { + if (APR_STATUS_IS_EINTR(rc) || APR_STATUS_IS_TIMEUP(rc)) { + return APR_SUCCESS; + } else { + return rc; + } + } + while (num) { + /* TODO: Error check */ + motorz_io_callback(mz, out_pfd); + out_pfd++; + num--; + } + return APR_SUCCESS; +} + /** * Create worker thread pool. */ static apr_status_t motorz_setup_workers(motorz_core_t *mz) { apr_status_t rv; - + rv = apr_thread_pool_create(&mz->workers, threads_per_child, threads_per_child, mz->pool); - + if (rv) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, APLOGNO() "motorz_setup_workers: apr_thread_pool_create with %d threads failed", threads_per_child); return rv; } - + return APR_SUCCESS; } -static int motorz_setup_pollcb(motorz_core_t *mz) +static int motorz_setup_pollset(motorz_core_t *mz) { int i; apr_status_t rv; int good_methods[] = {APR_POLLSET_KQUEUE, APR_POLLSET_PORT, APR_POLLSET_EPOLL}; char *methods[] = {"kqueue", "port", "epoll"}; - + for (i = 0; i < sizeof(good_methods) / sizeof(void*); i++) { - rv = apr_pollcb_create_ex(&mz->pollcb, + rv = apr_pollset_create_ex(&mz->pollset, 512, mz->pool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY | APR_POLLSET_NODEFAULT, good_methods[i]); if (rv == APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, APLOGNO() - "motorz_setup_pollcb: apr_pollcb_create_ex using %s", methods[i]); + "motorz_setup_pollset: apr_pollset_create_ex using %s", methods[i]); break; } } if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_INFO, rv, ap_server_conf, APLOGNO() - "motorz_setup_pollcb: apr_pollcb_create_ex failed for all possible backends!"); - rv = apr_pollcb_create(&mz->pollcb, + "motorz_setup_pollset: apr_pollset_create_ex failed for all possible backends!"); + rv = apr_pollset_create(&mz->pollset, 512, mz->pool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY); } if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, APLOGNO() - "motorz_setup_pollcb: apr_pollcb_create failed for all possible backends!"); + "motorz_setup_pollset: apr_pollset_create failed for all possible backends!"); } return rv; } @@ -847,9 +873,10 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) (void) ap_update_child_status(sbh, SERVER_READY, (request_rec *) NULL); +#if 0 apr_skiplist_init(&mz->timer_ring, mz->pool); apr_skiplist_set_compare(mz->timer_ring, indexing_comp, indexing_compk); - +#endif status = motorz_setup_workers(mz); if (status != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, status, ap_server_conf, APLOGNO() @@ -857,10 +884,10 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) clean_child_exit(APEXIT_CHILDSICK); } - status = motorz_setup_pollcb(mz); + status = motorz_setup_pollset(mz); if (status != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO() - "Couldn't setup pollcb in child; check system or user limits"); + "Couldn't setup pollset in child; check system or user limits"); clean_child_exit(APEXIT_CHILDSICK); /* assume temporary resource issue */ } @@ -885,7 +912,7 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) clean_child_exit(0); } - status = apr_pollcb_add(mz->pollcb, pfd); + status = apr_pollset_add(mz->pollset, pfd); if (status != APR_SUCCESS) { /* If the child processed a SIGWINCH before setting up the * pollset, this error path is expected and harmless, @@ -893,7 +920,7 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) * pollute the logs in that case. */ if (!die_now) { ap_log_error(APLOG_MARK, APLOG_EMERG, status, ap_server_conf, APLOGNO() - "Couldn't add listener to pollcb; check system or user limits"); + "Couldn't add listener to pollset; check system or user limits"); clean_child_exit(APEXIT_CHILDSICK); } clean_child_exit(0); @@ -926,10 +953,10 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) apr_time_t tnow = apr_time_now(); motorz_timer_t *te; apr_interval_time_t timeout = apr_time_from_msec(500); - + apr_thread_mutex_lock(mz->mtx); te = apr_skiplist_peek(mz->timer_ring); - + if (te) { if (tnow < te->expires) { timeout = (te->expires - tnow); @@ -942,21 +969,21 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) } } apr_thread_mutex_unlock(mz->mtx); - - status = apr_pollcb_poll(mz->pollcb, timeout, motorz_io_callback, mz); - + + status = motorz_pollset_cb(mz, timeout); + tnow = apr_time_now(); - - if (status) { + + if (status != APR_SUCCESS) { if (!APR_STATUS_IS_EINTR(status) && !APR_STATUS_IS_TIMEUP(status)) { ap_log_error(APLOG_MARK, APLOG_CRIT, status, NULL, "motorz_main_loop: apr_pollcb_poll failed"); clean_child_exit(0); } } - + apr_thread_mutex_lock(mz->mtx); - + /* now iterate any timers and push to worker pool */ while (te) { if (te->expires < tnow) { @@ -970,7 +997,7 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) } te = apr_skiplist_peek(mz->timer_ring); } - + apr_thread_mutex_unlock(mz->mtx); } if (ap_mpm_pod_check(my_bucket->pod) == APR_SUCCESS) { /* selected as idle? */ @@ -988,7 +1015,6 @@ static void child_main(motorz_core_t *mz, int child_num_arg, int child_bucket) clean_child_exit(0); } - static int make_child(motorz_core_t *mz, server_rec *s, int slot, int bucket) { int pid; @@ -1232,7 +1258,7 @@ static int motorz_run(apr_pool_t *_pconf, apr_pool_t *plog, server_rec *s) child_slot = ap_find_child_by_pid(&pid); if (processed_status == APEXIT_CHILDFATAL) { /* fix race condition found in PR 39311 - * A child created at the same time as a graceful happens + * A child created at the same time as a graceful happens * can find the lock missing and create a fatal error. * It is not fatal for the last generation to be in this state. */ @@ -1544,6 +1570,8 @@ static int motorz_pre_config(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp) if (!g_motorz_core) { mz = g_motorz_core = ap_retained_data_create(userdata_key, sizeof(*g_motorz_core)); mz->max_daemons_limit = -1; + mz->timer_ring = motorz_timer_ring; + mz->pollset = motorz_pollset; } ++mz->module_loads; if (mz->module_loads == 2) { @@ -1765,7 +1793,7 @@ static const char *set_threads_per_child(cmd_parms * cmd, void *dummy, if (err != NULL) { return err; } - + threads_per_child = atoi(arg); return NULL; } diff --git a/server/mpm/motorz/motorz.h b/server/mpm/motorz/motorz.h index 87ffd957cd..d97ef25149 100644 --- a/server/mpm/motorz/motorz.h +++ b/server/mpm/motorz/motorz.h @@ -128,7 +128,7 @@ struct motorz_core_t { int max_daemons_limit; apr_pool_t *pool; apr_thread_mutex_t *mtx; - apr_pollcb_t *pollcb; + apr_pollset_t *pollset; apr_skiplist *timer_ring; apr_thread_pool_t *workers; }; @@ -179,7 +179,12 @@ struct motorz_conn_t apr_socket_t *sock; apr_bucket_alloc_t *ba; ap_sb_handle_t *sbh; + /** connection record this struct refers to */ conn_rec *c; + /** request record (if any) this struct refers to */ + request_rec *r; + /** is the current conn_rec suspended? */ + int suspended; /** poll file descriptor information */ apr_pollfd_t pfd; /** public parts of the connection state */ -- 2.40.0