*/
static int start_lingering_close_blocking(event_conn_state_t *cs)
{
+ apr_status_t rv;
+ struct timeout_queue *q;
apr_socket_t *csd = cs->pfd.desc.s;
if (ap_start_lingering_close(cs->c)) {
notify_suspend(cs);
apr_socket_close(csd);
ap_queue_info_push_pool(worker_queue_info, cs->p);
- return DONE;
+ return 0;
}
#ifdef AP_DEBUG
{
- apr_status_t rv;
rv = apr_socket_timeout_set(csd, 0);
AP_DEBUG_ASSERT(rv == APR_SUCCESS);
}
* DoS attacks.
*/
if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+ q = short_linger_q;
cs->pub.state = CONN_STATE_LINGER_SHORT;
}
else {
+ q = linger_q;
cs->pub.state = CONN_STATE_LINGER_NORMAL;
}
apr_atomic_inc32(&lingering_count);
notify_suspend(cs);
- return OK;
+ cs->pfd.reqevents = (
+ cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
+ APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
+ cs->pub.sense = CONN_SENSE_DEFAULT;
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_APPEND(q, cs);
+ rv = apr_pollset_add(event_pollset, &cs->pfd);
+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+ TO_QUEUE_REMOVE(q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
+ "start_lingering_close: apr_pollset_add failure");
+ apr_socket_close(cs->pfd.desc.s);
+ ap_queue_info_push_pool(worker_queue_info, cs->p);
+ return 0;
+ }
+ apr_thread_mutex_unlock(timeout_mutex);
+ return 1;
}
/*
{
apr_socket_t *csd = ap_get_conn_socket(cs->c);
ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
- "socket abort in state %i", (int)cs->pub.state);
+ "socket reached timeout in lingering-close state");
abort_socket_nonblocking(csd);
ap_queue_info_push_pool(worker_queue_info, cs->p);
if (dying)
return OK;
}
-/* Forward declare */
-static void process_lingering_close(event_conn_state_t *cs);
-
/*
* process one connection in the worker
*/
{
conn_rec *c;
long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
- int clogging = 0;
- apr_status_t rv;
- int rc = OK;
+ int rc;
if (cs == NULL) { /* This is a new connection */
listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
cs->pub.sense = CONN_SENSE_DEFAULT;
- rc = OK;
}
else {
c = cs->c;
c->id = conn_id;
}
- if (c->aborted) {
+ rc = OK;
+ if (c->aborted || cs->pub.state == CONN_STATE_LINGER) {
/* do lingering close below */
cs->pub.state = CONN_STATE_LINGER;
}
- else if (cs->pub.state >= CONN_STATE_LINGER) {
- /* fall through */
+ else if (c->clogging_input_filters) {
+ /* Since we have an input filter which 'clogs' the input stream,
+ * like mod_ssl used to, lets just do the normal read from input
+ * filters, like the Worker MPM does. Filters that need to write
+ * where they would otherwise read, or read where they would
+ * otherwise write, should set the sense appropriately.
+ */
+ apr_atomic_inc32(&clogged_count);
+ rc = ap_run_process_connection(c);
+ apr_atomic_dec32(&clogged_count);
+ if (rc == DONE) {
+ rc = OK;
+ }
}
- else {
- if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE
- /* If we have an input filter which 'clogs' the input stream,
- * like mod_ssl used to, lets just do the normal read from input
- * filters, like the Worker MPM does. Filters that need to write
- * where they would otherwise read, or read where they would
- * otherwise write, should set the sense appropriately.
- */
- || c->clogging_input_filters) {
+ else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
read_request:
- clogging = c->clogging_input_filters;
- if (clogging) {
- apr_atomic_inc32(&clogged_count);
- }
- rc = ap_run_process_connection(c);
- if (clogging) {
- apr_atomic_dec32(&clogged_count);
- }
- if (rc == DONE) {
- rc = OK;
- }
+ rc = ap_run_process_connection(c);
+ if (rc == DONE) {
+ rc = OK;
}
}
/*
* appropriately upon return, for event MPM to either:
* - do lingering close (CONN_STATE_LINGER),
* - wait for readability of the next request with respect to the keepalive
- * timeout (state CONN_STATE_CHECK_REQUEST_LINE_READABLE),
- * - wait for read/write-ability of the underlying socket with respect to
- * its timeout by setting c->clogging_input_filters to 1 and the sense
- * to CONN_SENSE_WANT_READ/WRITE (state CONN_STATE_WRITE_COMPLETION),
+ * timeout (CONN_STATE_CHECK_REQUEST_LINE_READABLE),
* - keep flushing the output filters stack in nonblocking mode, and then
* if required wait for read/write-ability of the underlying socket with
- * respect to its own timeout (state CONN_STATE_WRITE_COMPLETION); since
+ * respect to its own timeout (CONN_STATE_WRITE_COMPLETION); since write
* completion at some point may require reads (e.g. SSL_ERROR_WANT_READ),
- * an output filter can also set the sense to CONN_SENSE_WANT_READ at any
- * time for event MPM to do the right thing,
+ * an output filter can set the sense to CONN_SENSE_WANT_READ at any time
+ * for event MPM to do the right thing,
* - suspend the connection (SUSPENDED) such that it now interracts with
* the MPM through suspend/resume_connection() hooks, and/or registered
* poll callbacks (PT_USER), and/or registered timed callbacks triggered
* while this was expected to do lingering close unconditionally with
* worker or prefork MPMs for instance.
*/
- if (rc != OK || (cs->pub.state >= CONN_STATE_NUM)
- || (cs->pub.state != CONN_STATE_LINGER
+ if (rc != OK || (cs->pub.state != CONN_STATE_LINGER
&& cs->pub.state != CONN_STATE_WRITE_COMPLETION
&& cs->pub.state != CONN_STATE_CHECK_REQUEST_LINE_READABLE
&& cs->pub.state != CONN_STATE_SUSPENDED)) {
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->wc_q, cs);
- rv = apr_pollset_add(event_pollset, &cs->pfd);
- if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
- AP_DEBUG_ASSERT(0);
+ rc = apr_pollset_add(event_pollset, &cs->pfd);
+ if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) {
TO_QUEUE_REMOVE(cs->sc->wc_q, cs);
apr_thread_mutex_unlock(timeout_mutex);
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
+ ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03465)
"process_socket: apr_pollset_add failure for "
"write completion");
apr_socket_close(cs->pfd.desc.s);
}
}
- if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+ if (cs->pub.state == CONN_STATE_LINGER) {
+ start_lingering_close_blocking(cs);
+ }
+ else if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL);
/* It greatly simplifies the logic to use a single timeout value per q
cs->pfd.reqevents = APR_POLLIN;
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->ka_q, cs);
- rv = apr_pollset_add(event_pollset, &cs->pfd);
- if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
- AP_DEBUG_ASSERT(0);
+ rc = apr_pollset_add(event_pollset, &cs->pfd);
+ if (rc != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rc)) {
TO_QUEUE_REMOVE(cs->sc->ka_q, cs);
apr_thread_mutex_unlock(timeout_mutex);
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
+ ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(03093)
"process_socket: apr_pollset_add failure for "
"keep alive");
apr_socket_close(cs->pfd.desc.s);
ap_queue_info_push_pool(worker_queue_info, cs->p);
+ return;
}
- else {
- apr_thread_mutex_unlock(timeout_mutex);
- }
- return;
+ apr_thread_mutex_unlock(timeout_mutex);
}
-
- if (cs->pub.state == CONN_STATE_SUSPENDED) {
+ else if (cs->pub.state == CONN_STATE_SUSPENDED) {
apr_atomic_inc32(&suspended_count);
notify_suspend(cs);
- return;
- }
-
- if (cs->pub.state == CONN_STATE_LINGER) {
- rc = start_lingering_close_blocking(cs);
- }
- if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
- cs->pub.state == CONN_STATE_LINGER_SHORT)) {
- process_lingering_close(cs);
}
}
/*
* Close socket and clean up if remote closed its end while we were in
- * lingering close. Only to be called in the worker thread, and since it's
- * in immediate call stack, we can afford a comfortable buffer size to
- * consume data quickly.
+ * lingering close.
+ * Only to be called in the listener thread;
+ * Pre-condition: cs is in one of the linger queues and in the pollset
*/
-#define LINGERING_BUF_SIZE (32 * 1024)
-static void process_lingering_close(event_conn_state_t *cs)
+static void process_lingering_close(event_conn_state_t *cs, const apr_pollfd_t *pfd)
{
apr_socket_t *csd = ap_get_conn_socket(cs->c);
- char dummybuf[LINGERING_BUF_SIZE];
+ char dummybuf[2048];
apr_size_t nbytes;
apr_status_t rv;
struct timeout_queue *q;
+ q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
/* socket is already in non-blocking state */
do {
rv = apr_socket_recv(csd, dummybuf, &nbytes);
} while (rv == APR_SUCCESS);
- if (!APR_STATUS_IS_EAGAIN(rv)) {
- rv = apr_socket_close(csd);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ if (APR_STATUS_IS_EAGAIN(rv)) {
return;
}
- /* Re-queue the connection to come back when readable */
- cs->pfd.reqevents = APR_POLLIN;
- cs->pub.sense = CONN_SENSE_DEFAULT;
- q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
apr_thread_mutex_lock(timeout_mutex);
- TO_QUEUE_APPEND(q, cs);
- rv = apr_pollset_add(event_pollset, &cs->pfd);
- if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
- AP_DEBUG_ASSERT(0);
- TO_QUEUE_REMOVE(q, cs);
- apr_thread_mutex_unlock(timeout_mutex);
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
- "process_lingering_close: apr_pollset_add failure");
- rv = apr_socket_close(cs->pfd.desc.s);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
- return;
- }
+ TO_QUEUE_REMOVE(q, cs);
+ rv = apr_pollset_remove(event_pollset, pfd);
apr_thread_mutex_unlock(timeout_mutex);
+ AP_DEBUG_ASSERT(rv == APR_SUCCESS || APR_STATUS_IS_NOTFOUND(rv));
+
+ rv = apr_socket_close(csd);
+ AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+
+ ap_queue_info_push_pool(worker_queue_info, cs->p);
+ if (dying)
+ ap_queue_interrupt_one(worker_queue);
}
/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
last = cs;
rv = apr_pollset_remove(event_pollset, &cs->pfd);
if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
- AP_DEBUG_ASSERT(0);
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473)
"apr_pollset_remove failed");
}
if (pt->type == PT_CSD) {
/* one of the sockets is readable */
event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
- struct timeout_queue *remove_from_q = NULL;
- /* don't wait for a worker for a keepalive request or
- * lingering close processing. */
- int blocking = 0;
+ struct timeout_queue *remove_from_q = cs->sc->wc_q;
+ int blocking = 1;
switch (cs->pub.state) {
- case CONN_STATE_WRITE_COMPLETION:
- remove_from_q = cs->sc->wc_q;
- blocking = 1;
- break;
-
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
remove_from_q = cs->sc->ka_q;
- break;
-
- case CONN_STATE_LINGER_NORMAL:
- remove_from_q = linger_q;
- break;
-
- case CONN_STATE_LINGER_SHORT:
- remove_from_q = short_linger_q;
- break;
-
- default:
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, APLOGNO(03096)
- "event_loop: unexpected state %d",
- cs->pub.state);
- ap_assert(0);
- }
-
- if (remove_from_q) {
+ /* don't wait for a worker for a keepalive request */
+ blocking = 0;
+ /* FALL THROUGH */
+ case CONN_STATE_WRITE_COMPLETION:
+ get_worker(&have_idle_worker, blocking,
+ &workers_were_busy);
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_REMOVE(remove_from_q, cs);
rc = apr_pollset_remove(event_pollset, &cs->pfd);
apr_thread_mutex_unlock(timeout_mutex);
+
/*
* Some of the pollset backends, like KQueue or Epoll
* automagically remove the FD if the socket is closed,
* and we still want to keep going
*/
if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
- AP_DEBUG_ASSERT(0);
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
APLOGNO(03094) "pollset remove failed");
start_lingering_close_nonblocking(cs);
break;
}
- /* If we don't get a worker immediately (nonblocking), we
- * close the connection; the client can re-connect to a
- * different process for keepalive, and for lingering close
- * the connection will be reset so the choice is to favor
- * incoming/alive connections.
+ /* If we didn't get a worker immediately for a keep-alive
+ * request, we close the connection, so that the client can
+ * re-connect to a different process.
*/
- get_worker(&have_idle_worker, blocking,
- &workers_were_busy);
if (!have_idle_worker) {
- if (remove_from_q == cs->sc->ka_q) {
- start_lingering_close_nonblocking(cs);
- }
- else {
- stop_lingering_close(cs);
- }
+ start_lingering_close_nonblocking(cs);
}
else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
have_idle_worker = 0;
}
+ break;
+
+ case CONN_STATE_LINGER_NORMAL:
+ case CONN_STATE_LINGER_SHORT:
+ process_lingering_close(cs, out_pfd);
+ break;
+
+ default:
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
+ ap_server_conf, APLOGNO(03096)
+ "event_loop: unexpected state %d",
+ cs->pub.state);
+ ap_assert(0);
}
}
else if (pt->type == PT_ACCEPT && !listeners_disabled()) {