From 9aa0d5f56ead506ca102bb939feb9888233a8ed6 Mon Sep 17 00:00:00 2001 From: Yann Ylavic Date: Tue, 20 Feb 2018 12:56:16 +0000 Subject: [PATCH] Merge r1823047, r1824454, r1824463, r1824464, r1824497, r1824862 from trunk: mpm_event: move lingering close "sucker" from the listener to worker(s). This was the last non-constant time action performed by the listener thread. It's now handled by the worker thread directly after entering lingering close, which should directly address the cases when the socket is already closed remotely at that time, hence avoid more scheduling (it may be the common case for some scenarios). And it's only if the above would need blocking (i.e. more data to suck) that the socket is added to the pollset for the listener to re-schedule a worker later when ready. If no worker is available at that time then the socket is forcibly closed (similarly to what's done for keepalive connections in this case). Also, since process_lingering_close() is now called by a worker thread and with almost no depth in the call stack, we can grow the size of the "suck" buffer from 2K to 32K to potentially call recv() up to sixteen times less. mpm_event: follow up to r1823047. Update clogged counter on read_request retry too. mpm_event: follow up to r1823047: simplify "clogging" logic (reentrance). mpm_event: follow up to r1823047: complete state validation after processing. mpm_event: follow up to r1823047: CHANGES entry. mpm_event: follow up to r1823047 and r1824464. MMN bump for CONN_STATE_NUM, plus don't consider CONN_STATE_LINGER_* as valid states returned process_connection (never have been). Submitted by: ylavic Reviewed by: ylavic, minfrin, jim [Reverted by r1824874] git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1824868 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES | 2 + include/ap_mmn.h | 3 +- include/httpd.h | 4 +- server/mpm/event/event.c | 242 ++++++++++++++++++++++----------------- 4 files changed, 144 insertions(+), 107 deletions(-) diff --git a/CHANGES b/CHANGES index c551cd4d50..01ac798c09 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,8 @@ -*- coding: utf-8 -*- Changes with Apache 2.4.31 + *) mpm_event: Do lingering close in worker(s). [Yann Ylavic] + *) mpm_queue: Put fdqueue code in common for MPMs event and worker. [Yann Ylavic] diff --git a/include/ap_mmn.h b/include/ap_mmn.h index b18ea724bc..f57d64ad1b 100644 --- a/include/ap_mmn.h +++ b/include/ap_mmn.h @@ -511,6 +511,7 @@ * ap_regcomp_set_default_cflags and * ap_regcomp_default_cflag_by_name * 20120211.75 (2.4.30-dev) Add hostname_ex to proxy_worker_shared + * 20120211.76 (2.4.30-dev) Add CONN_STATE_NUM to enum conn_state_e */ #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */ @@ -518,7 +519,7 @@ #ifndef MODULE_MAGIC_NUMBER_MAJOR #define MODULE_MAGIC_NUMBER_MAJOR 20120211 #endif -#define MODULE_MAGIC_NUMBER_MINOR 75 /* 0...n */ +#define MODULE_MAGIC_NUMBER_MINOR 76 /* 0...n */ /** * Determine if the server's current MODULE_MAGIC_NUMBER is at least a diff --git a/include/httpd.h b/include/httpd.h index 61ab2e6ed6..a9fe056f34 100644 --- a/include/httpd.h +++ b/include/httpd.h @@ -1199,7 +1199,9 @@ typedef enum { CONN_STATE_SUSPENDED, CONN_STATE_LINGER, /* connection may be closed with lingering */ CONN_STATE_LINGER_NORMAL, /* MPM has started lingering close with normal timeout */ - CONN_STATE_LINGER_SHORT /* MPM has started lingering close with short timeout */ + CONN_STATE_LINGER_SHORT, /* MPM has started lingering close with short timeout */ + + CONN_STATE_NUM /* Number of states (keep/kept last) */ } conn_state_e; typedef enum { diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 74d2596afc..117d308092 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -799,19 +799,18 @@ static void notify_resume(event_conn_state_t *cs, int cleanup) */ static int start_lingering_close_blocking(event_conn_state_t *cs) { - apr_status_t rv; - struct timeout_queue *q; apr_socket_t *csd = cs->pfd.desc.s; if (ap_start_lingering_close(cs->c)) { notify_suspend(cs); apr_socket_close(csd); ap_queue_info_push_pool(worker_queue_info, cs->p); - return 0; + return DONE; } #ifdef AP_DEBUG { + apr_status_t rv; rv = apr_socket_timeout_set(csd, 0); AP_DEBUG_ASSERT(rv == APR_SUCCESS); } @@ -826,34 +825,15 @@ static int start_lingering_close_blocking(event_conn_state_t *cs) * DoS attacks. */ if (apr_table_get(cs->c->notes, "short-lingering-close")) { - q = short_linger_q; cs->pub.state = CONN_STATE_LINGER_SHORT; } else { - q = linger_q; cs->pub.state = CONN_STATE_LINGER_NORMAL; } apr_atomic_inc32(&lingering_count); notify_suspend(cs); - cs->pfd.reqevents = ( - cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT : - APR_POLLIN) | APR_POLLHUP | APR_POLLERR; - cs->pub.sense = CONN_SENSE_DEFAULT; - apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_APPEND(q, cs); - rv = apr_pollset_add(event_pollset, &cs->pfd); - if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { - TO_QUEUE_REMOVE(q, cs); - apr_thread_mutex_unlock(timeout_mutex); - ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092) - "start_lingering_close: apr_pollset_add failure"); - apr_socket_close(cs->pfd.desc.s); - ap_queue_info_push_pool(worker_queue_info, cs->p); - return 0; - } - apr_thread_mutex_unlock(timeout_mutex); - return 1; + return OK; } /* @@ -888,7 +868,7 @@ static int stop_lingering_close(event_conn_state_t *cs) { apr_socket_t *csd = ap_get_conn_socket(cs->c); ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf, - "socket reached timeout in lingering-close state"); + "socket abort in state %i", (int)cs->pub.state); abort_socket_nonblocking(csd); ap_queue_info_push_pool(worker_queue_info, cs->p); if (dying) @@ -962,6 +942,9 @@ static int event_post_read_request(request_rec *r) return OK; } +/* Forward declare */ +static void process_lingering_close(event_conn_state_t *cs); + /* * process one connection in the worker */ @@ -971,7 +954,9 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc { conn_rec *c; long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num); - int rc; + int clogging = 0; + apr_status_t rv; + int rc = OK; if (cs == NULL) { /* This is a new connection */ listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt)); @@ -1028,6 +1013,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc cs->pub.state = CONN_STATE_READ_REQUEST_LINE; cs->pub.sense = CONN_SENSE_DEFAULT; + rc = OK; } else { c = cs->c; @@ -1038,30 +1024,34 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc c->id = conn_id; } - rc = OK; - if (c->aborted || cs->pub.state == CONN_STATE_LINGER) { + if (c->aborted) { /* do lingering close below */ cs->pub.state = CONN_STATE_LINGER; } - else if (c->clogging_input_filters) { - /* Since we have an input filter which 'clogs' the input stream, - * like mod_ssl used to, lets just do the normal read from input - * filters, like the Worker MPM does. Filters that need to write - * where they would otherwise read, or read where they would - * otherwise write, should set the sense appropriately. - */ - apr_atomic_inc32(&clogged_count); - rc = ap_run_process_connection(c); - apr_atomic_dec32(&clogged_count); - if (rc == DONE) { - rc = OK; - } + else if (cs->pub.state >= CONN_STATE_LINGER) { + /* fall through */ } - else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) { + else { + if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE + /* If we have an input filter which 'clogs' the input stream, + * like mod_ssl used to, lets just do the normal read from input + * filters, like the Worker MPM does. Filters that need to write + * where they would otherwise read, or read where they would + * otherwise write, should set the sense appropriately. + */ + || c->clogging_input_filters) { read_request: - rc = ap_run_process_connection(c); - if (rc == DONE) { - rc = OK; + clogging = c->clogging_input_filters; + if (clogging) { + apr_atomic_inc32(&clogged_count); + } + rc = ap_run_process_connection(c); + if (clogging) { + apr_atomic_dec32(&clogged_count); + } + if (rc == DONE) { + rc = OK; + } } } /* @@ -1069,13 +1059,16 @@ read_request: * appropriately upon return, for event MPM to either: * - do lingering close (CONN_STATE_LINGER), * - wait for readability of the next request with respect to the keepalive - * timeout (CONN_STATE_CHECK_REQUEST_LINE_READABLE), + * timeout (state CONN_STATE_CHECK_REQUEST_LINE_READABLE), + * - wait for read/write-ability of the underlying socket with respect to + * its timeout by setting c->clogging_input_filters to 1 and the sense + * to CONN_SENSE_WANT_READ/WRITE (state CONN_STATE_WRITE_COMPLETION), * - keep flushing the output filters stack in nonblocking mode, and then * if required wait for read/write-ability of the underlying socket with - * respect to its own timeout (CONN_STATE_WRITE_COMPLETION); since write + * respect to its own timeout (state CONN_STATE_WRITE_COMPLETION); since * completion at some point may require reads (e.g. SSL_ERROR_WANT_READ), - * an output filter can set the sense to CONN_SENSE_WANT_READ at any time - * for event MPM to do the right thing, + * an output filter can also set the sense to CONN_SENSE_WANT_READ at any + * time for event MPM to do the right thing, * - suspend the connection (SUSPENDED) such that it now interracts with * the MPM through suspend/resume_connection() hooks, and/or registered * poll callbacks (PT_USER), and/or registered timed callbacks triggered @@ -1089,7 +1082,8 @@ read_request: * while this was expected to do lingering close unconditionally with * worker or prefork MPMs for instance. */ - if (rc != OK || (cs->pub.state != CONN_STATE_LINGER + if (rc != OK || (cs->pub.state >= CONN_STATE_NUM) + || (cs->pub.state != CONN_STATE_LINGER && cs->pub.state != CONN_STATE_WRITE_COMPLETION && cs->pub.state != CONN_STATE_CHECK_REQUEST_LINE_READABLE && cs->pub.state != CONN_STATE_SUSPENDED)) { @@ -1137,11 +1131,12 @@ read_request: apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_APPEND(cs->sc->wc_q, cs); - rc = apr_pollset_add(event_pollset, &cs->pfd); - if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) { + rv = apr_pollset_add(event_pollset, &cs->pfd); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + AP_DEBUG_ASSERT(0); TO_QUEUE_REMOVE(cs->sc->wc_q, cs); apr_thread_mutex_unlock(timeout_mutex); - ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03465) + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465) "process_socket: apr_pollset_add failure for " "write completion"); apr_socket_close(cs->pfd.desc.s); @@ -1165,10 +1160,7 @@ read_request: } } - if (cs->pub.state == CONN_STATE_LINGER) { - start_lingering_close_blocking(cs); - } - else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { + if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL); /* It greatly simplifies the logic to use a single timeout value per q @@ -1186,22 +1178,35 @@ read_request: cs->pfd.reqevents = APR_POLLIN; apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_APPEND(cs->sc->ka_q, cs); - rc = apr_pollset_add(event_pollset, &cs->pfd); - if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) { + rv = apr_pollset_add(event_pollset, &cs->pfd); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + AP_DEBUG_ASSERT(0); TO_QUEUE_REMOVE(cs->sc->ka_q, cs); apr_thread_mutex_unlock(timeout_mutex); - ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03093) + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093) "process_socket: apr_pollset_add failure for " "keep alive"); apr_socket_close(cs->pfd.desc.s); ap_queue_info_push_pool(worker_queue_info, cs->p); - return; } - apr_thread_mutex_unlock(timeout_mutex); + else { + apr_thread_mutex_unlock(timeout_mutex); + } + return; } - else if (cs->pub.state == CONN_STATE_SUSPENDED) { + + if (cs->pub.state == CONN_STATE_SUSPENDED) { apr_atomic_inc32(&suspended_count); notify_suspend(cs); + return; + } + + if (cs->pub.state == CONN_STATE_LINGER) { + rc = start_lingering_close_blocking(cs); + } + if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL || + cs->pub.state == CONN_STATE_LINGER_SHORT)) { + process_lingering_close(cs); } } @@ -1452,18 +1457,18 @@ static apr_status_t event_register_timed_callback(apr_time_t t, /* * Close socket and clean up if remote closed its end while we were in - * lingering close. - * Only to be called in the listener thread; - * Pre-condition: cs is in one of the linger queues and in the pollset + * lingering close. Only to be called in the worker thread, and since it's + * in immediate call stack, we can afford a comfortable buffer size to + * consume data quickly. */ -static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *pfd) +#define LINGERING_BUF_SIZE (32 * 1024) +static void process_lingering_close(event_conn_state_t *cs) { apr_socket_t *csd = ap_get_conn_socket(cs->c); - char dummybuf[2048]; + char dummybuf[LINGERING_BUF_SIZE]; apr_size_t nbytes; apr_status_t rv; struct timeout_queue *q; - q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q; /* socket is already in non-blocking state */ do { @@ -1471,22 +1476,32 @@ static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t * rv = apr_socket_recv(csd, dummybuf, &nbytes); } while (rv == APR_SUCCESS); - if (APR_STATUS_IS_EAGAIN(rv)) { + if (!APR_STATUS_IS_EAGAIN(rv)) { + rv = apr_socket_close(csd); + AP_DEBUG_ASSERT(rv == APR_SUCCESS); + ap_queue_info_push_pool(worker_queue_info, cs->p); return; } + /* Re-queue the connection to come back when readable */ + cs->pfd.reqevents = APR_POLLIN; + cs->pub.sense = CONN_SENSE_DEFAULT; + q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q; apr_thread_mutex_lock(timeout_mutex); - TO_QUEUE_REMOVE(q, cs); - rv = apr_pollset_remove(event_pollset, pfd); + TO_QUEUE_APPEND(q, cs); + rv = apr_pollset_add(event_pollset, &cs->pfd); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + AP_DEBUG_ASSERT(0); + TO_QUEUE_REMOVE(q, cs); + apr_thread_mutex_unlock(timeout_mutex); + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092) + "process_lingering_close: apr_pollset_add failure"); + rv = apr_socket_close(cs->pfd.desc.s); + AP_DEBUG_ASSERT(rv == APR_SUCCESS); + ap_queue_info_push_pool(worker_queue_info, cs->p); + return; + } apr_thread_mutex_unlock(timeout_mutex); - AP_DEBUG_ASSERT(rv == APR_SUCCESS || APR_STATUS_IS_NOTFOUND(rv)); - - rv = apr_socket_close(csd); - AP_DEBUG_ASSERT(rv == APR_SUCCESS); - - ap_queue_info_push_pool(worker_queue_info, cs->p); - if (dying) - ap_queue_interrupt_one(worker_queue); } /* call 'func' for all elements of 'q' with timeout less than 'timeout_time'. @@ -1541,6 +1556,7 @@ static void process_timeout_queue(struct timeout_queue *q, last = cs; rv = apr_pollset_remove(event_pollset, &cs->pfd); if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { + AP_DEBUG_ASSERT(0); ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473) "apr_pollset_remove failed"); } @@ -1740,24 +1756,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (pt->type == PT_CSD) { /* one of the sockets is readable */ event_conn_state_t *cs = (event_conn_state_t *) pt->baton; - struct timeout_queue *remove_from_q = cs->sc->wc_q; - int blocking = 1; + struct timeout_queue *remove_from_q = NULL; + /* don't wait for a worker for a keepalive request or + * lingering close processing. */ + int blocking = 0; switch (cs->pub.state) { + case CONN_STATE_WRITE_COMPLETION: + remove_from_q = cs->sc->wc_q; + blocking = 1; + break; + case CONN_STATE_CHECK_REQUEST_LINE_READABLE: cs->pub.state = CONN_STATE_READ_REQUEST_LINE; remove_from_q = cs->sc->ka_q; - /* don't wait for a worker for a keepalive request */ - blocking = 0; - /* FALL THROUGH */ - case CONN_STATE_WRITE_COMPLETION: - get_worker(&have_idle_worker, blocking, - &workers_were_busy); + break; + + case CONN_STATE_LINGER_NORMAL: + remove_from_q = linger_q; + break; + + case CONN_STATE_LINGER_SHORT: + remove_from_q = short_linger_q; + break; + + default: + ap_log_error(APLOG_MARK, APLOG_CRIT, rc, + ap_server_conf, APLOGNO(03096) + "event_loop: unexpected state %d", + cs->pub.state); + ap_assert(0); + } + + if (remove_from_q) { apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_REMOVE(remove_from_q, cs); rc = apr_pollset_remove(event_pollset, &cs->pfd); apr_thread_mutex_unlock(timeout_mutex); - /* * Some of the pollset backends, like KQueue or Epoll * automagically remove the FD if the socket is closed, @@ -1765,35 +1800,32 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) * and we still want to keep going */ if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { + AP_DEBUG_ASSERT(0); ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03094) "pollset remove failed"); start_lingering_close_nonblocking(cs); break; } - /* If we didn't get a worker immediately for a keep-alive - * request, we close the connection, so that the client can - * re-connect to a different process. + /* If we don't get a worker immediately (nonblocking), we + * close the connection; the client can re-connect to a + * different process for keepalive, and for lingering close + * the connection will be reset so the choice is to favor + * incoming/alive connections. */ + get_worker(&have_idle_worker, blocking, + &workers_were_busy); if (!have_idle_worker) { - start_lingering_close_nonblocking(cs); + if (remove_from_q == cs->sc->ka_q) { + start_lingering_close_nonblocking(cs); + } + else { + stop_lingering_close(cs); + } } else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) { have_idle_worker = 0; } - break; - - case CONN_STATE_LINGER_NORMAL: - case CONN_STATE_LINGER_SHORT: - process_lingering_close(cs, out_pfd); - break; - - default: - ap_log_error(APLOG_MARK, APLOG_CRIT, rc, - ap_server_conf, APLOGNO(03096) - "event_loop: unexpected state %d", - cs->pub.state); - ap_assert(0); } } else if (pt->type == PT_ACCEPT && !listeners_disabled()) { -- 2.40.0