ap_send_error_response(r_1st_err, recursive_error);
}
-static void check_pipeline_flush(conn_rec *c)
+static void check_pipeline(conn_rec *c)
{
- apr_bucket *e;
- apr_bucket_brigade *bb;
-
- /* ### if would be nice if we could PEEK without a brigade. that would
- ### allow us to defer creation of the brigade to when we actually
- ### need to send a FLUSH. */
- bb = apr_brigade_create(c->pool, c->bucket_alloc);
-
- /* Flush the filter contents if:
- *
- * 1) the connection will be closed
- * 2) there isn't a request ready to be read
- */
/* ### is zero correct? that means "read one line" */
if (c->keepalive != AP_CONN_CLOSE) {
+ apr_bucket_brigade *bb = apr_brigade_create(c->pool, c->bucket_alloc);
if (ap_get_brigade(c->input_filters, bb, AP_MODE_EATCRLF,
APR_NONBLOCK_READ, 0) != APR_SUCCESS) {
c->data_in_input_filters = 0; /* we got APR_EOF or an error */
}
else {
c->data_in_input_filters = 1;
- return; /* don't flush */
}
}
-
- e = apr_bucket_flush_create(c->bucket_alloc);
-
- /* We just send directly to the connection based filters. At
- * this point, we know that we have seen all of the data
- * (request finalization sent an EOS bucket, which empties all
- * of the request filters). We just want to flush the buckets
- * if something hasn't been sent to the network yet.
- */
- APR_BRIGADE_INSERT_HEAD(bb, e);
- ap_pass_brigade(c->output_filters, bb);
}
-void ap_process_request(request_rec *r)
+void ap_process_async_request(request_rec *r)
{
int access_status;
apr_bucket_brigade *bb;
*/
c->cs->state = CONN_STATE_WRITE_COMPLETION;
- check_pipeline_flush(c);
+ check_pipeline(c);
if (ap_extended_status)
ap_time_process_request(c->sbh, STOP_PREQUEST);
}
+void ap_process_request(request_rec *r)
+{
+ apr_bucket_brigade *bb;
+ apr_bucket *b;
+ conn_rec *c = r->connection;
+
+ ap_process_async_request(r);
+
+ if (!c->data_in_input_filters) {
+ bb = apr_brigade_create(c->pool, c->bucket_alloc);
+ b = apr_bucket_flush_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_HEAD(bb, b);
+ ap_pass_brigade(c->output_filters, bb);
+ }
+ if (ap_extended_status) {
+ ap_time_process_request(c->sbh, STOP_PREQUEST);
+ }
+}
+
static apr_table_t *rename_original_env(apr_pool_t *p, apr_table_t *t)
{
const apr_array_header_t *env_arr = apr_table_elts(t);
apr_thread_mutex_t *timeout_mutex;
APR_RING_HEAD(timeout_head_t, conn_state_t);
-static struct timeout_head_t timeout_head;
+static struct timeout_head_t timeout_head, keepalive_timeout_head;
static apr_pollset_t *event_pollset;
pt->status = 1;
pt->baton = cs;
cs->pfd.client_data = pt;
+ APR_RING_ELEM_INIT(cs, timeout_list);
ap_update_vhost_given_ip(c);
else {
c = cs->c;
c->sbh = sbh;
+ pt = cs->pfd.client_data;
}
+read_request:
if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
if (!c->aborted) {
ap_run_process_connection(c);
cs->state = CONN_STATE_LINGER;
}
}
+
+ if (cs->state == CONN_STATE_WRITE_COMPLETION) {
+ /* For now, do blocking writes in this thread to transfer the
+ * rest of the response. TODO: Hand off this connection to a
+ * pollset for asynchronous write completion.
+ */
+ ap_filter_t *output_filter = c->output_filters;
+ apr_status_t rv;
+ while (output_filter->next != NULL) {
+ output_filter = output_filter->next;
+ }
+ rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, rv, ap_server_conf,
+ "network write failure in core output filter");
+ cs->state = CONN_STATE_LINGER;
+ }
+ else if (c->data_in_output_filters) {
+ /* Still in WRITE_COMPLETION_STATE:
+ * Set a write timeout for this connection, and let the
+ * event thread poll for writeability.
+ */
+ cs->expiration_time = ap_server_conf->timeout + time_now;
+ apr_thread_mutex_lock(timeout_mutex);
+ APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+ apr_thread_mutex_unlock(timeout_mutex);
+ pt->status = 0;
+ cs->pfd.reqevents = APR_POLLOUT | APR_POLLHUP | APR_POLLERR;
+ rc = apr_pollset_add(event_pollset, &cs->pfd);
+ return 1;
+ }
+ else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
+ ap_graceful_stop_signalled()) {
+ c->cs->state = CONN_STATE_LINGER;
+ }
+ else if (c->data_in_input_filters) {
+ cs->state = CONN_STATE_READ_REQUEST_LINE;
+ goto read_request;
+ }
+ else {
+ cs->state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
+ }
+ }
if (cs->state == CONN_STATE_LINGER) {
ap_lingering_close(c);
*/
cs->expiration_time = ap_server_conf->keep_alive_timeout + time_now;
apr_thread_mutex_lock(timeout_mutex);
- APR_RING_INSERT_TAIL(&timeout_head, cs, conn_state_t, timeout_list);
+ APR_RING_INSERT_TAIL(&keepalive_timeout_head, cs, conn_state_t, timeout_list);
apr_thread_mutex_unlock(timeout_mutex);
pt->status = 0;
- /* Add work to pollset. These are always read events */
+ /* Add work to pollset. */
+ cs->pfd.reqevents = APR_POLLIN;
rc = apr_pollset_add(event_pollset, &cs->pfd);
if (rc != APR_SUCCESS) {
}
APR_RING_INIT(&timeout_head, conn_state_t, timeout_list);
+ APR_RING_INIT(&keepalive_timeout_head, conn_state_t, timeout_list);
/* Create the main pollset */
rc = apr_pollset_create(&event_pollset,
ap_threads_per_child,
- tpool, APR_POLLSET_THREADSAFE);
+ tpool, APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
if (rc != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
"apr_pollset_create with Thread Safety failed. "
}
for (lr = ap_listeners; lr != NULL; lr = lr->next) {
- apr_pollfd_t pfd = { 0 };
+ apr_pollfd_t *pfd = apr_palloc(tpool, sizeof(*pfd));
pt = apr_pcalloc(tpool, sizeof(*pt));
- pfd.desc_type = APR_POLL_SOCKET;
- pfd.desc.s = lr->sd;
- pfd.reqevents = APR_POLLIN;
+ pfd->desc_type = APR_POLL_SOCKET;
+ pfd->desc.s = lr->sd;
+ pfd->reqevents = APR_POLLIN;
pt->type = PT_ACCEPT;
pt->baton = lr;
- pfd.client_data = pt;
+ pfd->client_data = pt;
- apr_socket_opt_set(pfd.desc.s, APR_SO_NONBLOCK, 1);
- apr_pollset_add(event_pollset, &pfd);
+ apr_socket_opt_set(pfd->desc.s, APR_SO_NONBLOCK, 1);
+ apr_pollset_add(event_pollset, pfd);
}
/* Unblock the signal used to wake this thread up, and set a handler for
case CONN_STATE_CHECK_REQUEST_LINE_READABLE:
cs->state = CONN_STATE_READ_REQUEST_LINE;
break;
+ case CONN_STATE_WRITE_COMPLETION:
+ break;
default:
ap_log_error(APLOG_MARK, APLOG_ERR, rc,
ap_server_conf,
apr_thread_mutex_lock(timeout_mutex);
APR_RING_REMOVE(cs, timeout_list);
apr_thread_mutex_unlock(timeout_mutex);
+ APR_RING_ELEM_INIT(cs, timeout_list);
rc = push2worker(out_pfd, event_pollset);
if (rc != APR_SUCCESS) {
/* handle timed out sockets */
apr_thread_mutex_lock(timeout_mutex);
- cs = APR_RING_FIRST(&timeout_head);
+ /* Step 1: keepalive timeouts */
+ cs = APR_RING_FIRST(&keepalive_timeout_head);
timeout_time = time_now + TIMEOUT_FUDGE_FACTOR;
- while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
+ while (!APR_RING_EMPTY(&keepalive_timeout_head, conn_state_t, timeout_list)
&& cs->expiration_time < timeout_time
&& get_worker(&have_idle_worker)) {
*/
}
have_idle_worker = 0;
+ cs = APR_RING_FIRST(&keepalive_timeout_head);
+ }
+
+ /* Step 2: write completion timeouts */
+ cs = APR_RING_FIRST(&timeout_head);
+ while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list)
+ && cs->expiration_time < timeout_time
+ && get_worker(&have_idle_worker)) {
+
+ cs->state = CONN_STATE_LINGER;
+ APR_RING_REMOVE(cs, timeout_list);
+ rc = push2worker(&cs->pfd, event_pollset);
+ if (rc != APR_SUCCESS) {
+ return NULL;
+ }
+ have_idle_worker = 0;
cs = APR_RING_FIRST(&timeout_head);
}
+
apr_thread_mutex_unlock(timeout_mutex);
} /* listener main loop */
if (restart_num++ == 1) {
is_graceful = 0;
rv = apr_pollset_create(&event_pollset, 1, plog,
- APR_POLLSET_THREADSAFE);
+ APR_POLLSET_THREADSAFE | APR_POLLSET_NOCOPY);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
"Couldn't create a Thread Safe Pollset. "