-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) core: Add ap_mpm_register_socket_callback_timeout() API. [Eric Covener]
+
+ *) mod_proxy_wstunnel: Honor ProxyWebsocketIdleTimeout in asynchronous
+ processing mode. [Eric Covener]
+
*) mod_alias: Stop setting CONTEXT_PREFIX and CONTEXT_DOCUMENT environment
variables as a result of AliasMatch. [Eric Covener]
<usage>
<p>This directive imposes a maximum amount of time for the tunnel to be
- left open while idle. This directive is ignored if <directive>ProxyWebsocketAsync</directive>
- is enabled and the running MPM supports the necessary features</p>
+ left open while idle.</p>
</usage>
</directivesynopsis>
* 20140207.1 (2.5.0-dev) Add SSL reusable SNI to mod_proxy.h's proxy_conn_rec
* 20140207.2 (2.5.0-dev) Add proxy detach_backend hook
* 20140207.3 (2.5.0-dev) Add mod_ssl_openssl.h and OpenSSL-specific hooks
+ * 20140207.4 (2.5.0-dev) add ap_mpm_register_socket_callback_timeout
*/
#define MODULE_MAGIC_COOKIE 0x41503235UL /* "AP25" */
#ifndef MODULE_MAGIC_NUMBER_MAJOR
#define MODULE_MAGIC_NUMBER_MAJOR 20140207
#endif
-#define MODULE_MAGIC_NUMBER_MINOR 3 /* 0...n */
+#define MODULE_MAGIC_NUMBER_MINOR 4 /* 0...n */
/**
* Determine if the server's current MODULE_MAGIC_NUMBER is at least a
* APR_ENOTIMPL if no asynch support, or an apr_pollset_add error.
* @remark When activity is found on any 1 socket in the list, all are removed
* from the pollset and only 1 callback is issued.
- * @fn apr_status_t ap_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton)
*/
AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s,
int for_read,
ap_mpm_callback_fn_t *cbfn,
void *baton);
+ /**
+ * Register a callback on the readability or writability on a group of sockets, with a timeout
+ * @param s Null-terminated list of sockets
+ * @param p pool for use between registration and callback
+ * @param for_read Whether the sockets are monitored for read or writability
+ * @param cbfn The callback function
+ * @param tofn The callback function if the timeout expires
+ * @param baton userdata for the callback function
+ * @param timeout timeout for I/O in microseconds, unlimited if <= 0
+ * @return APR_SUCCESS if all sockets could be added to a pollset,
+ * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error.
+ * @remark When activity is found on any 1 socket in the list, all are removed
+ * from the pollset and only 1 callback is issued.
+ * @remark For each call, only one of tofn or cbfn will be called, never both.
+ */
+
+AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback_timeout(apr_socket_t **s,
+ apr_pool_t *p,
+ int for_read,
+ ap_mpm_callback_fn_t *cbfn,
+ ap_mpm_callback_fn_t *tofn,
+ void *baton,
+ apr_time_t timeout);
+
AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s,
apr_pool_t *p);
AP_DECLARE_HOOK(apr_status_t, mpm_register_socket_callback,
(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton))
+/* register the specified callback, with timeout
+ * @ingroup hooks
+ *
+ */
+AP_DECLARE_HOOK(apr_status_t, mpm_register_socket_callback_timeout,
+ (apr_socket_t **s, apr_pool_t *p, int for_read,
+ ap_mpm_callback_fn_t *cbfn,
+ ap_mpm_callback_fn_t *tofn,
+ void *baton,
+ apr_time_t timeout))
/**
* Unregister the specified callback
* @ingroup hooks
apr_socket_t *client_soc;
apr_pollset_t *pollset;
apr_bucket_brigade *bb;
- int is_client;
- apr_pool_t *subpool;
- char *scheme;
+ apr_pool_t *subpool; /* cleared before each suspend, destroyed when request ends */
+ char *scheme; /* required to release the proxy connection */
} ws_baton_t;
static int proxy_wstunnel_transfer(request_rec *r, conn_rec *c_i, conn_rec *c_o,
apr_bucket_brigade *bb, char *name);
+static void proxy_wstunnel_callback(void *b);
static int proxy_wstunnel_pump(ws_baton_t *baton, apr_time_t timeout, int try_async) {
request_rec *r = baton->r;
apr_bucket_brigade *bb = baton->bb;
while(1) {
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, "poll timeout is %"APR_TIME_T_FMT"ms %s", apr_time_as_msec(timeout), try_async ? "async" : "sync");
if ((rv = apr_pollset_poll(pollset, timeout, &pollcnt, &signalled))
!= APR_SUCCESS) {
if (APR_STATUS_IS_EINTR(rv)) {
}
else if (APR_STATUS_IS_TIMEUP(rv)) {
if (try_async) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02542) "Attempting to go async");
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(02542) "Attempting to go async");
return SUSPENDED;
}
else {
return OK;
}
+static void proxy_wstunnel_finish(ws_baton_t *baton) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, "proxy_wstunnel_finish");
+ baton->proxy_connrec->close = 1; /* new handshake expected on each back-conn */
+ baton->r->connection->keepalive = AP_CONN_CLOSE;
+ ap_proxy_release_connection(baton->scheme, baton->proxy_connrec, baton->r->server);
+ ap_finalize_request_protocol(baton->r);
+ ap_lingering_close(baton->r->connection);
+ apr_socket_close(baton->client_soc);
+ ap_process_request_after_handler(baton->r); /* don't touch baton or r after here */
+}
+
+/* If neither socket becomes readable in the specified timeout,
+ * this callback will kill the request. We do not have to worry about
+ * having a cancel and a IO both queued.
+ */
+static void proxy_wstunnel_cancel_callback(void *b)
+{
+ ws_baton_t *baton = (ws_baton_t*)b;
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, "proxy_wstunnel_cancel_callback, IO timed out");
+ proxy_wstunnel_finish(baton);
+ return;
+}
+
+/* Invoked by the event loop when data is ready on either end.
+ * Pump both ends until they'd block and then start over again
+ * We don't need the invoke_mtx, since we never put multiple callback events
+ * in the queue.
+ */
static void proxy_wstunnel_callback(void *b) {
int status;
+ apr_socket_t *sockets[3] = {NULL, NULL, NULL};
ws_baton_t *baton = (ws_baton_t*)b;
proxyws_dir_conf *dconf = ap_get_module_config(baton->r->per_dir_config, &proxy_wstunnel_module);
-
- apr_socket_t *sockets[3] = {NULL, NULL, NULL};
- apr_thread_mutex_lock(baton->r->invoke_mtx);
apr_pool_clear(baton->subpool);
status = proxy_wstunnel_pump(baton, dconf->async_delay, dconf->is_async);
- sockets[0] = baton->client_soc;
- sockets[1] = baton->server_soc;
if (status == SUSPENDED) {
- ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton);
+ sockets[0] = baton->client_soc;
+ sockets[1] = baton->server_soc;
+ ap_mpm_register_socket_callback_timeout(sockets, baton->subpool, 1,
+ proxy_wstunnel_callback,
+ proxy_wstunnel_cancel_callback,
+ baton,
+ dconf->idle_timeout);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, "proxy_wstunnel_callback suspend");
}
- else {
- ap_mpm_unregister_socket_callback(sockets, baton->subpool);
- ap_proxy_release_connection(baton->scheme, baton->proxy_connrec, baton->r->server);
- apr_thread_mutex_unlock(baton->r->invoke_mtx);
- ap_finalize_request_protocol(baton->r);
- ap_process_request_after_handler(baton->r);
- return;
+ else {
+ proxy_wstunnel_finish(baton);
}
- apr_thread_mutex_unlock(baton->r->invoke_mtx);
}
+
/*
* Canonicalise http-like URLs.
* scheme is the scheme for the URL
}
else {
status = proxy_wstunnel_pump(baton, dconf->async_delay, dconf->is_async);
+ apr_pool_clear(baton->subpool);
if (status == SUSPENDED) {
sockets[0] = baton->client_soc;
sockets[1] = baton->server_soc;
- status = ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton);
+ status = ap_mpm_register_socket_callback_timeout(sockets, baton->subpool, 1,
+ proxy_wstunnel_callback,
+ proxy_wstunnel_cancel_callback,
+ baton,
+ dconf->idle_timeout);
if (status == APR_SUCCESS) {
return SUSPENDED;
}
else if (status == APR_ENOTIMPL) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02544) "No async support");
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(02544) "No async support");
status = proxy_wstunnel_pump(baton, dconf->idle_timeout, 0); /* force no async */
}
else {
"on if idle websockets connections should be monitored asyncronously"),
AP_INIT_TAKE1("ProxyWebsocketIdleTimeout", proxyws_set_idle, NULL, RSRC_CONF|ACCESS_CONF,
- "timeout for activity in either direction, unlimited by default. Not currently supported with ProxyWebsocketAsync"),
+ "timeout for activity in either direction, unlimited by default"),
AP_INIT_TAKE1("ProxyWebsocketAsyncDelay", proxyws_set_aysnch_delay, NULL, RSRC_CONF|ACCESS_CONF,
"amount of time to poll before going asyncronous"),
void *user_baton;
apr_pollfd_t **pfds;
int nsock;
- int signaled;
+ timer_event_t *cancel_event; /* If a timeout was requested, a pointer to the timer event */
+ unsigned int signaled:1;
} socket_callback_baton_t;
/* data retained by event across load/unload of the module
static apr_thread_mutex_t *g_timer_skiplist_mtx;
-static apr_status_t event_register_timed_callback(apr_time_t t,
- ap_mpm_callback_fn_t *cbfn,
- void *baton)
+static timer_event_t * event_get_timer_event(apr_time_t t,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton,
+ int insert,
+ apr_pollfd_t **remove)
{
timer_event_t *te;
/* oh yeah, and make locking smarter/fine grained. */
+
apr_thread_mutex_lock(g_timer_skiplist_mtx);
if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
te->cbfunc = cbfn;
te->baton = baton;
- /* XXXXX: optimize */
- te->when = t + apr_time_now();
-
- /* Okay, insert sorted by when.. */
- apr_skiplist_insert(timer_skiplist, (void *)te);
+ te->canceled = 0;
+ te->when = t;
+ te->remove = remove;
+ if (insert) {
+ /* Okay, insert sorted by when.. */
+ apr_skiplist_insert(timer_skiplist, (void *)te);
+ }
apr_thread_mutex_unlock(g_timer_skiplist_mtx);
+ return te;
+}
+
+static apr_status_t event_register_timed_callback_ex(apr_time_t t,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton,
+ apr_pollfd_t **remove)
+{
+ event_get_timer_event(t + apr_time_now(), cbfn, baton, 1, remove);
return APR_SUCCESS;
}
+static apr_status_t event_register_timed_callback(apr_time_t t,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton)
+{
+ event_register_timed_callback_ex(t, cbfn, baton, NULL);
+ return APR_SUCCESS;
+}
-static apr_status_t event_register_socket_callback(apr_socket_t **s,
+static apr_status_t event_register_socket_callback_ex(apr_socket_t **s,
apr_pool_t *p,
int for_read,
ap_mpm_callback_fn_t *cbfn,
- void *baton)
+ ap_mpm_callback_fn_t *tofn,
+ void *baton,
+ apr_time_t timeout)
{
apr_status_t rc, final_rc= APR_SUCCESS;
int i = 0, nsock;
i++;
}
nsock = i;
- pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*));
+
+ pfds = apr_pcalloc(p, (nsock+1) * sizeof(apr_pollfd_t*));
pt->type = PT_USER;
pt->baton = scb;
scb->pfds = pfds;
for (i = 0; i<nsock; i++) {
- pfds[i] = apr_palloc(p, sizeof(apr_pollfd_t));
+ pfds[i] = apr_pcalloc(p, sizeof(apr_pollfd_t));
pfds[i]->desc_type = APR_POLL_SOCKET;
pfds[i]->reqevents = (for_read ? APR_POLLIN : APR_POLLOUT) | APR_POLLERR | APR_POLLHUP;
pfds[i]->desc.s = s[i];
+ pfds[i]->p = p;
pfds[i]->client_data = pt;
+ }
+
+ if (timeout > 0) {
+ /* XXX: This cancel timer event count fire before the pollset is updated */
+ scb->cancel_event = event_get_timer_event(timeout + apr_time_now(), tofn, baton, 1, pfds);
+ }
+ for (i = 0; i<nsock; i++) {
rc = apr_pollset_add(event_pollset, pfds[i]);
if (rc != APR_SUCCESS) final_rc = rc;
}
return final_rc;
}
+static apr_status_t event_register_socket_callback(apr_socket_t **s,
+ apr_pool_t *p,
+ int for_read,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton)
+{
+ return event_register_socket_callback_ex(s, p, for_read,
+ cbfn,
+ NULL, /* no timeout function */
+ baton,
+ 0 /* no timeout */);
+}
static apr_status_t event_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p)
{
int i = 0, nsock;
rc = apr_pollset_remove(event_pollset, pfds[i]);
if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) final_rc = APR_SUCCESS;
}
+
return final_rc;
}
/* TOOD: what should do here? ugh. */
}
#endif
+ now = apr_time_now();
+ apr_thread_mutex_lock(g_timer_skiplist_mtx);
+ ep = apr_skiplist_peek(timer_skiplist);
+ while (ep) {
+ if (ep->when < now + EVENT_FUDGE_FACTOR) {
+ apr_skiplist_pop(timer_skiplist, NULL);
+ if (!ep->canceled) {
+ if (ep->remove != NULL) {
+ for (apr_pollfd_t **pfds = (ep->remove); *pfds != NULL; pfds++) {
+ apr_pollset_remove(event_pollset, *pfds);
+ }
+ }
+ }
+ push_timer2worker(ep);
+ }
+ else {
+ break;
+ }
+ ep = apr_skiplist_peek(timer_skiplist);
+ }
+ apr_thread_mutex_unlock(g_timer_skiplist_mtx);
+
rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
if (rc != APR_SUCCESS) {
if (APR_STATUS_IS_EINTR(rc)) {
break;
}
- now = apr_time_now();
- apr_thread_mutex_lock(g_timer_skiplist_mtx);
- ep = apr_skiplist_peek(timer_skiplist);
- while (ep) {
- if (ep->when < now + EVENT_FUDGE_FACTOR) {
- apr_skiplist_pop(timer_skiplist, NULL);
- push_timer2worker(ep);
- }
- else {
- break;
- }
- ep = apr_skiplist_peek(timer_skiplist);
- }
- apr_thread_mutex_unlock(g_timer_skiplist_mtx);
-
while (num) {
pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
timer_event_t *te;
int i = 0;
socket_callback_baton_t *baton = (socket_callback_baton_t *) pt->baton;
+ baton->cancel_event->canceled = 1;
- if (!baton->signaled) {
+ /* We only signal once per N sockets with this baton */
+ if (!(baton->signaled)) {
baton->signaled = 1;
- apr_thread_mutex_lock(g_timer_skiplist_mtx);
-
- if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
- te = APR_RING_FIRST(&timer_free_ring);
- APR_RING_REMOVE(te, link);
+ te = event_get_timer_event(-1 /* fake timer */,
+ baton->cbfunc,
+ baton->user_baton,
+ 0, /* don't insert it */
+ NULL /* no associated socket callback */);
+ /* remove other sockets in my set */
+ for (i = 0; i < baton->nsock; i++) {
+ apr_pollset_remove(event_pollset, baton->pfds[i]);
}
- else {
- te = apr_skiplist_alloc(timer_skiplist, sizeof(timer_event_t));
- APR_RING_ELEM_INIT(te, link);
- }
- apr_thread_mutex_unlock(g_timer_skiplist_mtx);
-
- for (i = 0; i < baton->nsock ; i++) {
- apr_pollset_remove(event_pollset, baton->pfds[i]);
- }
-
- te->cbfunc = baton->cbfunc;
- te->baton = baton->user_baton;
- te->when = -1;
push_timer2worker(te);
}
}
if (te != NULL) {
te->cbfunc(te->baton);
-
{
apr_thread_mutex_lock(g_timer_skiplist_mtx);
APR_RING_INSERT_TAIL(&timer_free_ring, te, timer_event_t, link);
APR_HOOK_MIDDLE);
ap_hook_mpm_register_socket_callback(event_register_socket_callback, NULL, NULL,
APR_HOOK_MIDDLE);
+ ap_hook_mpm_register_socket_callback_timeout(event_register_socket_callback_ex, NULL, NULL,
+ APR_HOOK_MIDDLE);
ap_hook_mpm_unregister_socket_callback(event_unregister_socket_callback, NULL, NULL,
APR_HOOK_MIDDLE);
ap_hook_pre_read_request(event_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE);
apr_time_t when;
ap_mpm_callback_fn_t *cbfunc;
void *baton;
+ int canceled;
+ apr_pollfd_t **remove;
};
-
struct fd_queue_t
{
APR_RING_HEAD(timers_t, timer_event_t) timers;
APR_HOOK_LINK(mpm_query) \
APR_HOOK_LINK(mpm_register_timed_callback) \
APR_HOOK_LINK(mpm_register_socket_callback) \
+ APR_HOOK_LINK(mpm_register_socket_callback_timeout) \
APR_HOOK_LINK(mpm_unregister_socket_callback) \
APR_HOOK_LINK(mpm_get_name) \
APR_HOOK_LINK(end_generation) \
AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback,
(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton),
(s, p, for_read, cbfn, baton), APR_ENOTIMPL)
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback_timeout,
+ (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, apr_time_t timeout),
+ (s, p, for_read, cbfn, tofn, baton, timeout), APR_ENOTIMPL)
AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_socket_callback,
(apr_socket_t **s, apr_pool_t *p),
(s, p), APR_ENOTIMPL)
{
return ap_run_mpm_register_socket_callback(s, p, for_read, cbfn, baton);
}
+AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback_timeout(apr_socket_t **s,
+ apr_pool_t *p,
+ int for_read,
+ ap_mpm_callback_fn_t *cbfn,
+ ap_mpm_callback_fn_t *tofn,
+ void *baton,
+ apr_time_t timeout)
+{
+ return ap_run_mpm_register_socket_callback_timeout(s, p, for_read, cbfn, tofn, baton, timeout);
+}
AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p)
{
return ap_run_mpm_unregister_socket_callback(s, p);