*/
static apr_pollset_t *event_pollset;
+/*
+ * The chain of connections to be shutdown by a worker thread (deferred),
+ * linked list updated atomically.
+ */
+static event_conn_state_t *volatile defer_linger_chain;
+
struct event_conn_state_t {
/** APR_RING of expiration timeouts */
APR_RING_ENTRY(event_conn_state_t) timeout_list;
apr_pollfd_t pfd;
/** public parts of the connection state */
conn_state_t pub;
+ /** chaining in defer_linger_chain */
+ struct event_conn_state_t *chain;
};
+
APR_RING_HEAD(timeout_head_t, event_conn_state_t);
struct timeout_queue {
ap_scoreboard_image->parent[process_slot].not_accepting = 0;
}
+static void abort_socket_nonblocking(apr_socket_t *csd)
+{
+ apr_status_t rv;
+ apr_socket_timeout_set(csd, 0);
+#if defined(SOL_SOCKET) && defined(SO_LINGER)
+ /* This socket is over now, and we don't want to block nor linger
+ * anymore, so reset it. A normal close could still linger in the
+ * system, while RST is fast, nonblocking, and what the peer will
+ * get if it sends us further data anyway.
+ */
+ {
+ apr_os_sock_t osd = -1;
+ struct linger opt;
+ opt.l_onoff = 1;
+ opt.l_linger = 0; /* zero timeout is RST */
+ apr_os_sock_get(&osd, csd);
+ setsockopt(osd, SOL_SOCKET, SO_LINGER, (void *)&opt, sizeof opt);
+ }
+#endif
+ rv = apr_socket_close(csd);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468)
+ "error closing socket");
+ AP_DEBUG_ASSERT(0);
+ }
+}
+
static void close_worker_sockets(void)
{
int i;
for (i = 0; i < threads_per_child; i++) {
- if (worker_sockets[i]) {
- apr_socket_close(worker_sockets[i]);
+ apr_socket_t *csd = worker_sockets[i];
+ if (csd) {
worker_sockets[i] = NULL;
+ abort_socket_nonblocking(csd);
+ }
+ }
+ for (;;) {
+ event_conn_state_t *cs = defer_linger_chain;
+ if (!cs) {
+ break;
+ }
+ if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+ cs) != cs) {
+ /* Race lost, try again */
+ continue;
}
+ cs->chain = NULL;
+ abort_socket_nonblocking(cs->pfd.desc.s);
}
}
ap_run_resume_connection(cs->c, cs->r);
}
-static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
+/*
+ * Close our side of the connection, flushing data to the client first.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ * timeout_mutex is not locked
+ * return: 0 if connection is fully closed,
+ * 1 if connection is lingering
+ * May only be called by worker thread.
+ */
+static int start_lingering_close_blocking(event_conn_state_t *cs)
{
apr_status_t rv;
struct timeout_queue *q;
apr_socket_t *csd = cs->pfd.desc.s;
+
+ if (ap_start_lingering_close(cs->c)) {
+ notify_suspend(cs);
+ apr_socket_close(csd);
+ ap_push_pool(worker_queue_info, cs->p);
+ return 0;
+ }
+
#ifdef AP_DEBUG
{
rv = apr_socket_timeout_set(csd, 0);
#else
apr_socket_timeout_set(csd, 0);
#endif
+
cs->queue_timestamp = apr_time_now();
/*
* If some module requested a shortened waiting period, only wait for
cs->pub.state = CONN_STATE_LINGER_NORMAL;
}
apr_atomic_inc32(&lingering_count);
- if (in_worker) {
- notify_suspend(cs);
- }
- else {
- cs->c->sbh = NULL;
- }
+ notify_suspend(cs);
+
cs->pfd.reqevents = (
cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
}
/*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- * timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- * 1 if connection is lingering
- * May only be called by worker thread.
- */
-static int start_lingering_close_blocking(event_conn_state_t *cs)
-{
- if (ap_start_lingering_close(cs->c)) {
- notify_suspend(cs);
- ap_push_pool(worker_queue_info, cs->p);
- return 0;
- }
- return start_lingering_close_common(cs, 1);
-}
-
-/*
- * Close our side of the connection, NOT flushing data to the client.
- * This should only be called if there has been an error or if we know
- * that our send buffers are empty.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
* Pre-condition: cs is not in any timeout queue and not in the pollset,
* timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- * 1 if connection is lingering
- * may be called by listener thread
+ * return: 1 connection is alive (but aside and about to linger)
+ * May be called by listener thread.
*/
static int start_lingering_close_nonblocking(event_conn_state_t *cs)
{
- conn_rec *c = cs->c;
- apr_socket_t *csd = cs->pfd.desc.s;
-
- if (ap_prep_lingering_close(c)
- || c->aborted
- || ap_shutdown_conn(c, 0) != APR_SUCCESS || c->aborted
- || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
- apr_socket_close(csd);
- ap_push_pool(worker_queue_info, cs->p);
- if (dying)
- ap_queue_interrupt_one(worker_queue);
- return 0;
+ event_conn_state_t *chain;
+ for (;;) {
+ cs->chain = chain = defer_linger_chain;
+ if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
+ chain) != chain) {
+ /* Race lost, try again */
+ continue;
+ }
+ return 1;
}
- return start_lingering_close_common(cs, 0);
}
/*
*/
static int stop_lingering_close(event_conn_state_t *cs)
{
- apr_status_t rv;
apr_socket_t *csd = ap_get_conn_socket(cs->c);
ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
"socket reached timeout in lingering-close state");
- rv = apr_socket_close(csd);
- if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468) "error closing socket");
- AP_DEBUG_ASSERT(0);
- }
+ abort_socket_nonblocking(csd);
ap_push_pool(worker_queue_info, cs->p);
if (dying)
ap_queue_interrupt_one(worker_queue);
c->current_thread = thd;
/* Subsequent request on a conn, and thread number is part of ID */
c->id = conn_id;
+
+ if (c->aborted) {
+ cs->pub.state = CONN_STATE_LINGER;
+ }
}
- if (c->clogging_input_filters && !c->aborted) {
+ if (cs->pub.state == CONN_STATE_LINGER) {
+ /* do lingering close below */
+ }
+ else if (c->clogging_input_filters) {
/* Since we have an input filter which 'clogs' the input stream,
* like mod_ssl used to, lets just do the normal read from input
* filters, like the Worker MPM does. Filters that need to write
}
apr_atomic_dec32(&clogged_count);
}
-
+ else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
read_request:
- if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
- if (!c->aborted) {
- ap_run_process_connection(c);
+ ap_run_process_connection(c);
- /* state will be updated upon return
- * fall thru to either wait for readability/timeout or
- * do lingering close
- */
- }
- else {
- cs->pub.state = CONN_STATE_LINGER;
- }
+ /* state will be updated upon return
+ * fall thru to either wait for readability/timeout or
+ * do lingering close
+ */
}
if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
return;
}
else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
- listener_may_exit) {
+ listener_may_exit) {
cs->pub.state = CONN_STATE_LINGER;
}
else if (c->data_in_input_filters) {
}
/*
- * Pre-condition: pfd->cs is neither in pollset nor timeout queue
+ * Pre-condition: cs is neither in event_pollset nor a timeout queue
* this function may only be called by the listener
*/
-static apr_status_t push2worker(const apr_pollfd_t * pfd,
- apr_pollset_t * pollset)
+static apr_status_t push2worker(event_conn_state_t *cs, apr_socket_t *csd,
+ apr_pool_t *ptrans)
{
- listener_poll_type *pt = (listener_poll_type *) pfd->client_data;
- event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
apr_status_t rc;
- rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+ if (cs) {
+ csd = cs->pfd.desc.s;
+ ptrans = cs->p;
+ }
+ rc = ap_queue_push(worker_queue, csd, cs, ptrans);
if (rc != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
+ "push2worker: ap_queue_push failed");
/* trash the connection; we couldn't queue the connected
* socket to a worker
*/
- apr_bucket_alloc_destroy(cs->bucket_alloc);
- apr_socket_close(cs->pfd.desc.s);
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, APLOGNO(00471) "push2worker: ap_queue_push failed");
- ap_push_pool(worker_queue_info, cs->p);
+ if (csd) {
+ abort_socket_nonblocking(csd);
+ }
+ if (ptrans) {
+ ap_push_pool(worker_queue_info, ptrans);
+ }
+ signal_threads(ST_GRACEFUL);
}
return rc;
TO_QUEUE_REMOVE(remove_from_q, cs);
rc = apr_pollset_remove(event_pollset, &cs->pfd);
apr_thread_mutex_unlock(timeout_mutex);
+ TO_QUEUE_ELEM_INIT(cs);
+
/*
* Some of the pollset backends, like KQueue or Epoll
* automagically remove the FD if the socket is closed,
break;
}
- TO_QUEUE_ELEM_INIT(cs);
/* If we didn't get a worker immediately for a keep-alive
* request, we close the connection, so that the client can
* re-connect to a different process.
*/
if (!have_idle_worker) {
start_lingering_close_nonblocking(cs);
- break;
- }
- rc = push2worker(out_pfd, event_pollset);
- if (rc != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, APLOGNO(03095)
- "push2worker failed");
}
- else {
+ else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
have_idle_worker = 0;
}
break;
+
case CONN_STATE_LINGER_NORMAL:
case CONN_STATE_LINGER_SHORT:
process_lingering_close(cs, out_pfd);
break;
+
default:
ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
ap_server_conf, APLOGNO(03096)
if (csd != NULL) {
conns_this_child--;
- rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
- if (rc != APR_SUCCESS) {
- /* trash the connection; we couldn't queue the connected
- * socket to a worker
- */
- apr_socket_close(csd);
- ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
- ap_server_conf, APLOGNO(03098)
- "ap_queue_push failed");
- ap_push_pool(worker_queue_info, ptrans);
- }
- else {
+ if (push2worker(NULL, csd, ptrans) == APR_SUCCESS) {
have_idle_worker = 0;
}
}
ps->keep_alive = 0;
}
+ /* If there are some lingering closes to defer (to a worker), schedule
+ * them now. We might wakeup a worker spuriously if another one empties
+ * defer_linger_chain in the meantime, but there also may be no active
+ * or all busy workers for an undefined time. In any case a deferred
+ * lingering close can't starve if we do that here since the chain is
+ * filled only above in the listener and it's emptied only in the
+ * worker(s); thus a NULL here means it will stay so while the listener
+ * waits (possibly indefinitely) in poll().
+ */
+ if (defer_linger_chain) {
+ get_worker(&have_idle_worker, 0, &workers_were_busy);
+ if (have_idle_worker
+ && defer_linger_chain /* re-test */
+ && push2worker(NULL, NULL, NULL) == APR_SUCCESS) {
+ have_idle_worker = 0;
+ }
+ }
+
if (listeners_disabled && !workers_were_busy
&& (int)apr_atomic_read32(&connection_count)
- (int)apr_atomic_read32(&lingering_count)
}
else {
is_idle = 0;
- worker_sockets[thread_slot] = csd;
- process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+ if (csd != NULL) {
+ worker_sockets[thread_slot] = csd;
+ process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+ worker_sockets[thread_slot] = NULL;
+ }
+ }
+
+ /* If there are deferred lingering closes, handle them now. */
+ while (!workers_may_exit) {
+ cs = defer_linger_chain;
+ if (!cs) {
+ break;
+ }
+ if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+ cs) != cs) {
+ /* Race lost, try again */
+ continue;
+ }
+ cs->chain = NULL;
+
+ worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
+#ifdef AP_DEBUG
+ rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+ AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+#else
+ apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+#endif
+ cs->pub.state = CONN_STATE_LINGER;
+ process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
worker_sockets[thread_slot] = NULL;
}
}
active_daemons_limit = server_limit;
threads_per_child = DEFAULT_THREADS_PER_CHILD;
max_workers = active_daemons_limit * threads_per_child;
+ defer_linger_chain = NULL;
had_healthy_child = 0;
ap_extended_status = 0;