*/
#include "mod_proxy.h"
+#include "ap_mpm.h"
module AP_MODULE_DECLARE_DATA proxy_wstunnel_module;
+typedef struct ws_baton_t {
+ request_rec *r;
+ proxy_conn_rec *proxy_connrec;
+ apr_socket_t *server_soc;
+ apr_socket_t *client_soc;
+ apr_pollset_t *pollset;
+ apr_bucket_brigade *bb;
+ int is_client;
+ apr_pool_t *subpool;
+} ws_baton_t;
+
+static int proxy_wstunnel_transfer(request_rec *r, conn_rec *c_i, conn_rec *c_o,
+ apr_bucket_brigade *bb, char *name);
+
+static int proxy_wstunnel_pump(ws_baton_t *baton, apr_time_t timeout) {
+ int client_error = 0;
+ request_rec *r = baton->r;
+ conn_rec *c = r->connection;
+ proxy_conn_rec *conn = baton->proxy_connrec;
+ apr_socket_t *sock = conn->sock;
+ conn_rec *backconn = conn->connection;
+ const apr_pollfd_t *signalled;
+ apr_int32_t pollcnt, pi;
+ apr_int16_t pollevent;
+ apr_pollset_t *pollset = baton->pollset;
+ apr_socket_t *client_socket = baton->client_soc;
+ apr_status_t rv;
+ apr_bucket_brigade *bb = baton->bb;
+
+ while(1) {
+ if ((rv = apr_pollset_poll(pollset, timeout, &pollcnt, &signalled))
+ != APR_SUCCESS) {
+ if (APR_STATUS_IS_EINTR(rv)) {
+ continue;
+ }
+ else if (APR_STATUS_IS_TIMEUP(rv)) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Attempting to go asynch");
+ return SUSPENDED;
+ }
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02444) "error apr_poll()");
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02445)
+ "woke from poll(), i=%d", pollcnt);
+
+ for (pi = 0; pi < pollcnt; pi++) {
+ const apr_pollfd_t *cur = &signalled[pi];
+
+ if (cur->desc.s == sock) {
+ pollevent = cur->rtnevents;
+ if (pollevent & APR_POLLIN) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02446)
+ "sock was readable");
+ rv = proxy_wstunnel_transfer(r, backconn, c, bb, "sock");
+ }
+ else if ((pollevent & APR_POLLERR)
+ || (pollevent & APR_POLLHUP)) {
+ rv = APR_EPIPE;
+ ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02447)
+ "err/hup on backconn");
+ }
+ if (rv != APR_SUCCESS)
+ client_error = 1;
+ }
+ else if (cur->desc.s == client_socket) {
+ pollevent = cur->rtnevents;
+ if (pollevent & APR_POLLIN) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02448)
+ "client was readable");
+ rv = proxy_wstunnel_transfer(r, c, backconn, bb, "client");
+ }
+ }
+ else {
+ rv = APR_EBADF;
+ ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(02449)
+ "unknown socket in pollset");
+ }
+
+ }
+ if (rv != APR_SUCCESS) {
+ break;
+ }
+
+ }
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+ "finished with poll() - cleaning up");
+
+ if (client_error) {
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+ return OK;
+}
+
+static void proxy_wstunnel_callback(void *b) {
+ int status;
+ ws_baton_t *baton = (ws_baton_t*)b;
+ apr_thread_mutex_lock(baton->r->invoke_mtx);
+ apr_pool_clear(baton->subpool);
+ status = proxy_wstunnel_pump(baton, apr_time_from_sec(5));
+ if (status == SUSPENDED) {
+ apr_socket_t *sockets[3] = {baton->client_soc, baton->server_soc, NULL};
+ ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton);
+ }
+ else {
+ apr_socket_t *sockets[3] = {baton->client_soc, baton->server_soc, NULL};
+ ap_mpm_unregister_socket_callback(sockets, baton->subpool);
+ apr_thread_mutex_unlock(baton->r->invoke_mtx);
+ ap_finalize_request_protocol(baton->r);
+ ap_process_request_after_handler(baton->r);
+ return;
+ }
+ apr_thread_mutex_unlock(baton->r->invoke_mtx);
+}
+
/*
* Canonicalise http-like URLs.
* scheme is the scheme for the URL
apr_status_t rv = APR_SUCCESS;
apr_pollset_t *pollset;
apr_pollfd_t pollfd;
- const apr_pollfd_t *signalled;
- apr_int32_t pollcnt, pi;
- apr_int16_t pollevent;
conn_rec *c = r->connection;
apr_socket_t *sock = conn->sock;
conn_rec *backconn = conn->connection;
- int client_error = 0;
char *buf;
apr_bucket_brigade *header_brigade;
apr_bucket *e;
char *old_te_val = NULL;
apr_bucket_brigade *bb = apr_brigade_create(p, c->bucket_alloc);
apr_socket_t *client_socket = ap_get_conn_socket(c);
+ ws_baton_t *baton = apr_pcalloc(r->pool, sizeof(ws_baton_t));
+ int status;
header_brigade = apr_brigade_create(p, backconn->bucket_alloc);
remove_reqtimeout(r->input_filters);
- while (1) { /* Infinite loop until error (one side closes the connection) */
- if ((rv = apr_pollset_poll(pollset, -1, &pollcnt, &signalled))
- != APR_SUCCESS) {
- if (APR_STATUS_IS_EINTR(rv)) {
- continue;
- }
- ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02444) "error apr_poll()");
- return HTTP_INTERNAL_SERVER_ERROR;
+ baton->r = r;
+ baton->pollset = pollset;
+ baton->client_soc = client_socket;
+ baton->server_soc = sock;
+ baton->proxy_connrec = conn;
+ baton->bb = bb;
+ apr_pool_create(&baton->subpool, r->pool);
+
+ status = proxy_wstunnel_pump(baton, apr_time_from_sec(5));
+ if (status == SUSPENDED) {
+ apr_socket_t *sockets[3] = {baton->client_soc, baton->server_soc, NULL};
+ status = ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton);
+ if (status == APR_SUCCESS) {
+ return SUSPENDED;
}
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02445)
- "woke from poll(), i=%d", pollcnt);
-
- for (pi = 0; pi < pollcnt; pi++) {
- const apr_pollfd_t *cur = &signalled[pi];
-
- if (cur->desc.s == sock) {
- pollevent = cur->rtnevents;
- if (pollevent & APR_POLLIN) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02446)
- "sock was readable");
- rv = proxy_wstunnel_transfer(r, backconn, c, bb, "sock");
- }
- else if ((pollevent & APR_POLLERR)
- || (pollevent & APR_POLLHUP)) {
- rv = APR_EPIPE;
- ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02447)
- "err/hup on backconn");
- }
- if (rv != APR_SUCCESS)
- client_error = 1;
- }
- else if (cur->desc.s == client_socket) {
- pollevent = cur->rtnevents;
- if (pollevent & APR_POLLIN) {
- ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02448)
- "client was readable");
- rv = proxy_wstunnel_transfer(r, c, backconn, bb, "client");
- }
- }
- else {
- rv = APR_EBADF;
- ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(02449)
- "unknown socket in pollset");
- }
-
+ else if (status == APR_ENOTIMPL) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "No asynch support");
+ status = proxy_wstunnel_pump(baton, -1);
}
- if (rv != APR_SUCCESS) {
- break;
+ else {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r,
+ "error creating websosckets tunnel");
+ return HTTP_INTERNAL_SERVER_ERROR;
}
}
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
- "finished with poll() - cleaning up");
-
- if (client_error) {
- return HTTP_INTERNAL_SERVER_ERROR;
- }
- return OK;
-}
-
+ return status;
+}
+
/*
*/
static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker,
}
/* Do not close the socket */
- ap_proxy_release_connection(scheme, backend, r->server);
+ if (status != SUSPENDED) {
+ ap_proxy_release_connection(scheme, backend, r->server);
+ }
return status;
}
#if HAVE_SERF
, PT_SERF
#endif
+ , PT_USER
} poll_type_e;
typedef struct
void *baton;
} listener_poll_type;
+typedef struct
+{
+ ap_mpm_callback_fn_t *cbfunc;
+ void *user_baton;
+ apr_pollfd_t **pfds;
+ int nsock;
+ int signaled;
+} socket_callback_baton_t;
+
/* data retained by event across load/unload of the module
* allocated on first call to pre-config hook; located on
* subsequent calls to pre-config hook
return APR_SUCCESS;
}
+
+static apr_status_t event_register_socket_callback(apr_socket_t **s,
+ apr_pool_t *p,
+ int for_read,
+ ap_mpm_callback_fn_t *cbfn,
+ void *baton)
+{
+ apr_status_t rc, final_rc= APR_SUCCESS;
+ int i = 0, nsock;
+ socket_callback_baton_t *scb = apr_pcalloc(p, sizeof(*scb));
+ listener_poll_type *pt = apr_palloc(p, sizeof(*pt));
+ apr_pollfd_t **pfds = NULL;
+
+ while(s[i] != NULL) {
+ i++;
+ }
+ nsock = i;
+ pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*));
+
+ pt->type = PT_USER;
+ pt->baton = scb;
+
+ scb->cbfunc = cbfn;
+ scb->user_baton = baton;
+ scb->nsock = nsock;
+ scb->pfds = pfds;
+
+ for (i = 0; i<nsock; i++) {
+ pfds[i] = apr_palloc(p, sizeof(apr_pollfd_t));
+ pfds[i]->desc_type = APR_POLL_SOCKET;
+ pfds[i]->reqevents = (for_read ? APR_POLLIN : APR_POLLOUT) | APR_POLLERR | APR_POLLHUP;
+ pfds[i]->desc.s = s[i];
+ pfds[i]->client_data = pt;
+ rc = apr_pollset_add(event_pollset, pfds[i]);
+ if (rc != APR_SUCCESS) final_rc = rc;
+ }
+ return final_rc;
+}
+static apr_status_t event_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p)
+{
+ int i = 0, nsock;
+ apr_status_t final_rc = APR_SUCCESS;
+ apr_pollfd_t *pfd = apr_palloc(p, sizeof(*pfd));
+ apr_pollfd_t **pfds = NULL;
+
+ while(s[i] != NULL) {
+ i++;
+ }
+ nsock = i;
+
+ pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*));
+
+ for (i = 0; i<nsock; i++) {
+ apr_status_t rc;
+ pfds[i] = apr_pcalloc(p, sizeof(apr_pollfd_t));
+ pfds[i]->desc_type = APR_POLL_SOCKET;
+ pfds[i]->reqevents = APR_POLLERR | APR_POLLHUP;
+ pfds[i]->desc.s = s[i];
+ pfds[i]->client_data = NULL;
+ rc = apr_pollset_remove(event_pollset, pfds[i]);
+ if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) final_rc = APR_SUCCESS;
+ }
+ return final_rc;
+}
+
/*
* Close socket and clean up if remote closed its end while we were in
* lingering close.
/* XXXX: this doesn't require get_worker() */
serf_event_trigger(g_serf, pt->baton, out_pfd);
}
+
#endif
+ else if (pt->type == PT_USER) {
+ /* masquerade as a timer event that is firing */
+ timer_event_t *te;
+ int i = 0;
+ socket_callback_baton_t *baton = (socket_callback_baton_t *) pt->baton;
+
+ if (!baton->signaled) {
+ baton->signaled = 1;
+ apr_thread_mutex_lock(g_timer_skiplist_mtx);
+
+ if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
+ te = APR_RING_FIRST(&timer_free_ring);
+ APR_RING_REMOVE(te, link);
+ }
+ else {
+ te = ap_skiplist_alloc(timer_skiplist, sizeof(timer_event_t));
+ APR_RING_ELEM_INIT(te, link);
+ }
+ apr_thread_mutex_unlock(g_timer_skiplist_mtx);
+
+ for (i = 0; i < baton->nsock ; i++) {
+ apr_pollset_remove(event_pollset, baton->pfds[i]);
+ }
+
+ te->cbfunc = baton->cbfunc;
+ te->baton = baton->user_baton;
+ te->when = -1;
+
+ push_timer2worker(te);
+ }
+ apr_pollset_remove(event_pollset, out_pfd);
+ }
out_pfd++;
num--;
} /* while for processing poll */
ap_hook_mpm_query(event_query, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_mpm_register_timed_callback(event_register_timed_callback, NULL, NULL,
APR_HOOK_MIDDLE);
+ ap_hook_mpm_register_socket_callback(event_register_socket_callback, NULL, NULL,
+ APR_HOOK_MIDDLE);
+ ap_hook_mpm_unregister_socket_callback(event_unregister_socket_callback, NULL, NULL,
+ APR_HOOK_MIDDLE);
+
ap_hook_mpm_get_name(event_get_name, NULL, NULL, APR_HOOK_MIDDLE);
}
APR_HOOK_LINK(mpm)
APR_HOOK_LINK(mpm_query)
APR_HOOK_LINK(mpm_register_timed_callback)
+ APR_HOOK_LINK(mpm_register_socket_callback)
+ APR_HOOK_LINK(mpm_unregister_socket_callback)
APR_HOOK_LINK(mpm_get_name)
APR_HOOK_LINK(end_generation)
APR_HOOK_LINK(child_status)
APR_HOOK_LINK(mpm)
APR_HOOK_LINK(mpm_query)
APR_HOOK_LINK(mpm_register_timed_callback)
+ APR_HOOK_LINK(mpm_register_socket_callback)
+ APR_HOOK_LINK(mpm_unregister_socket_callback)
APR_HOOK_LINK(mpm_get_name)
APR_HOOK_LINK(end_generation)
APR_HOOK_LINK(child_status)
AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_timed_callback,
(apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton),
(t, cbfn, baton), APR_ENOTIMPL)
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback,
+ (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton),
+ (s, p, for_read, cbfn, baton), APR_ENOTIMPL)
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_socket_callback,
+ (apr_socket_t **s, apr_pool_t *p),
+ (s, p), APR_ENOTIMPL)
+
AP_IMPLEMENT_HOOK_VOID(end_generation,
(server_rec *s, ap_generation_t gen),
(s, gen))
{
return ap_run_mpm_register_timed_callback(t, cbfn, baton);
}
+AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton)
+{
+ return ap_run_mpm_register_socket_callback(s, p, for_read, cbfn, baton);
+}
+AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p)
+{
+ return ap_run_mpm_unregister_socket_callback(s, p);
+}
AP_DECLARE(const char *)ap_show_mpm(void)
{