From f632ed1df1cca199f68e4ca7cbf042eaff435543 Mon Sep 17 00:00:00 2001 From: Jim Jagielski Date: Mon, 17 Jun 2013 12:43:52 +0000 Subject: [PATCH] Fold in Eric Covener's socket callback git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1493741 13f79535-47bb-0310-9956-ffa450edef68 --- include/ap_mpm.h | 23 ++++ include/mpm_common.h | 7 + modules/proxy/mod_proxy_wstunnel.c | 207 ++++++++++++++++++++--------- server/mpm/event/event.c | 113 ++++++++++++++++ server/mpm_common.c | 19 +++ 5 files changed, 309 insertions(+), 60 deletions(-) diff --git a/include/ap_mpm.h b/include/ap_mpm.h index 71f8f47caa..6b345ed6ae 100644 --- a/include/ap_mpm.h +++ b/include/ap_mpm.h @@ -167,6 +167,29 @@ AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback(apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton); +/** + * Register a callback on the readability or writability on a group of sockets + * @param s Null-terminated list of sockets + * @param p pool for use between registration and callback + * @param for_read Whether the sockets are monitored for read or writability + * @param cbfn The callback function + * @param baton userdata for the callback function + * @return APR_SUCCESS if all sockets could be added to a pollset, + * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error. + * @remark When activity is found on any 1 socket in the list, all are removed + * from the pollset and only 1 callback is issued. + * @fn apr_status_t (p_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton) + */ + +AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s, + apr_pool_t *p, + int for_read, + ap_mpm_callback_fn_t *cbfn, + void *baton); + +AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, + apr_pool_t *p); + typedef enum mpm_child_status { MPM_CHILD_STARTED, MPM_CHILD_EXITED, diff --git a/include/mpm_common.h b/include/mpm_common.h index c555b72c2f..8ba20d2ce2 100644 --- a/include/mpm_common.h +++ b/include/mpm_common.h @@ -409,6 +409,13 @@ AP_DECLARE_HOOK(int, mpm_query, (int query_code, int *result, apr_status_t *rv)) AP_DECLARE_HOOK(apr_status_t, mpm_register_timed_callback, (apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton)) +/* register the specified callback */ +AP_DECLARE_HOOK(apr_status_t, mpm_register_socket_callback, + (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton)) +/* unregister the specified callback */ +AP_DECLARE_HOOK(apr_status_t, mpm_unregister_socket_callback, + (apr_socket_t **s, apr_pool_t *p)) + /* get MPM name (e.g., "prefork" or "event") */ AP_DECLARE_HOOK(const char *,mpm_get_name,(void)) diff --git a/modules/proxy/mod_proxy_wstunnel.c b/modules/proxy/mod_proxy_wstunnel.c index 365a20549e..e0fbc142a8 100644 --- a/modules/proxy/mod_proxy_wstunnel.c +++ b/modules/proxy/mod_proxy_wstunnel.c @@ -15,9 +15,127 @@ */ #include "mod_proxy.h" +#include "ap_mpm.h" module AP_MODULE_DECLARE_DATA proxy_wstunnel_module; +typedef struct ws_baton_t { + request_rec *r; + proxy_conn_rec *proxy_connrec; + apr_socket_t *server_soc; + apr_socket_t *client_soc; + apr_pollset_t *pollset; + apr_bucket_brigade *bb; + int is_client; + apr_pool_t *subpool; +} ws_baton_t; + +static int proxy_wstunnel_transfer(request_rec *r, conn_rec *c_i, conn_rec *c_o, + apr_bucket_brigade *bb, char *name); + +static int proxy_wstunnel_pump(ws_baton_t *baton, apr_time_t timeout) { + int client_error = 0; + request_rec *r = baton->r; + conn_rec *c = r->connection; + proxy_conn_rec *conn = baton->proxy_connrec; + apr_socket_t *sock = conn->sock; + conn_rec *backconn = conn->connection; + const apr_pollfd_t *signalled; + apr_int32_t pollcnt, pi; + apr_int16_t pollevent; + apr_pollset_t *pollset = baton->pollset; + apr_socket_t *client_socket = baton->client_soc; + apr_status_t rv; + apr_bucket_brigade *bb = baton->bb; + + while(1) { + if ((rv = apr_pollset_poll(pollset, timeout, &pollcnt, &signalled)) + != APR_SUCCESS) { + if (APR_STATUS_IS_EINTR(rv)) { + continue; + } + else if (APR_STATUS_IS_TIMEUP(rv)) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Attempting to go asynch"); + return SUSPENDED; + } + else { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02444) "error apr_poll()"); + return HTTP_INTERNAL_SERVER_ERROR; + } + } + + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02445) + "woke from poll(), i=%d", pollcnt); + + for (pi = 0; pi < pollcnt; pi++) { + const apr_pollfd_t *cur = &signalled[pi]; + + if (cur->desc.s == sock) { + pollevent = cur->rtnevents; + if (pollevent & APR_POLLIN) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02446) + "sock was readable"); + rv = proxy_wstunnel_transfer(r, backconn, c, bb, "sock"); + } + else if ((pollevent & APR_POLLERR) + || (pollevent & APR_POLLHUP)) { + rv = APR_EPIPE; + ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02447) + "err/hup on backconn"); + } + if (rv != APR_SUCCESS) + client_error = 1; + } + else if (cur->desc.s == client_socket) { + pollevent = cur->rtnevents; + if (pollevent & APR_POLLIN) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02448) + "client was readable"); + rv = proxy_wstunnel_transfer(r, c, backconn, bb, "client"); + } + } + else { + rv = APR_EBADF; + ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(02449) + "unknown socket in pollset"); + } + + } + if (rv != APR_SUCCESS) { + break; + } + + } + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, + "finished with poll() - cleaning up"); + + if (client_error) { + return HTTP_INTERNAL_SERVER_ERROR; + } + return OK; +} + +static void proxy_wstunnel_callback(void *b) { + int status; + ws_baton_t *baton = (ws_baton_t*)b; + apr_thread_mutex_lock(baton->r->invoke_mtx); + apr_pool_clear(baton->subpool); + status = proxy_wstunnel_pump(baton, apr_time_from_sec(5)); + if (status == SUSPENDED) { + apr_socket_t *sockets[3] = {baton->client_soc, baton->server_soc, NULL}; + ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton); + } + else { + apr_socket_t *sockets[3] = {baton->client_soc, baton->server_soc, NULL}; + ap_mpm_unregister_socket_callback(sockets, baton->subpool); + apr_thread_mutex_unlock(baton->r->invoke_mtx); + ap_finalize_request_protocol(baton->r); + ap_process_request_after_handler(baton->r); + return; + } + apr_thread_mutex_unlock(baton->r->invoke_mtx); +} + /* * Canonicalise http-like URLs. * scheme is the scheme for the URL @@ -172,13 +290,9 @@ static int ap_proxy_wstunnel_request(apr_pool_t *p, request_rec *r, apr_status_t rv = APR_SUCCESS; apr_pollset_t *pollset; apr_pollfd_t pollfd; - const apr_pollfd_t *signalled; - apr_int32_t pollcnt, pi; - apr_int16_t pollevent; conn_rec *c = r->connection; apr_socket_t *sock = conn->sock; conn_rec *backconn = conn->connection; - int client_error = 0; char *buf; apr_bucket_brigade *header_brigade; apr_bucket *e; @@ -186,6 +300,8 @@ static int ap_proxy_wstunnel_request(apr_pool_t *p, request_rec *r, char *old_te_val = NULL; apr_bucket_brigade *bb = apr_brigade_create(p, c->bucket_alloc); apr_socket_t *client_socket = ap_get_conn_socket(c); + ws_baton_t *baton = apr_pcalloc(r->pool, sizeof(ws_baton_t)); + int status; header_brigade = apr_brigade_create(p, backconn->bucket_alloc); @@ -240,66 +356,35 @@ static int ap_proxy_wstunnel_request(apr_pool_t *p, request_rec *r, remove_reqtimeout(r->input_filters); - while (1) { /* Infinite loop until error (one side closes the connection) */ - if ((rv = apr_pollset_poll(pollset, -1, &pollcnt, &signalled)) - != APR_SUCCESS) { - if (APR_STATUS_IS_EINTR(rv)) { - continue; - } - ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02444) "error apr_poll()"); - return HTTP_INTERNAL_SERVER_ERROR; + baton->r = r; + baton->pollset = pollset; + baton->client_soc = client_socket; + baton->server_soc = sock; + baton->proxy_connrec = conn; + baton->bb = bb; + apr_pool_create(&baton->subpool, r->pool); + + status = proxy_wstunnel_pump(baton, apr_time_from_sec(5)); + if (status == SUSPENDED) { + apr_socket_t *sockets[3] = {baton->client_soc, baton->server_soc, NULL}; + status = ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton); + if (status == APR_SUCCESS) { + return SUSPENDED; } - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02445) - "woke from poll(), i=%d", pollcnt); - - for (pi = 0; pi < pollcnt; pi++) { - const apr_pollfd_t *cur = &signalled[pi]; - - if (cur->desc.s == sock) { - pollevent = cur->rtnevents; - if (pollevent & APR_POLLIN) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02446) - "sock was readable"); - rv = proxy_wstunnel_transfer(r, backconn, c, bb, "sock"); - } - else if ((pollevent & APR_POLLERR) - || (pollevent & APR_POLLHUP)) { - rv = APR_EPIPE; - ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02447) - "err/hup on backconn"); - } - if (rv != APR_SUCCESS) - client_error = 1; - } - else if (cur->desc.s == client_socket) { - pollevent = cur->rtnevents; - if (pollevent & APR_POLLIN) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02448) - "client was readable"); - rv = proxy_wstunnel_transfer(r, c, backconn, bb, "client"); - } - } - else { - rv = APR_EBADF; - ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(02449) - "unknown socket in pollset"); - } - + else if (status == APR_ENOTIMPL) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "No asynch support"); + status = proxy_wstunnel_pump(baton, -1); } - if (rv != APR_SUCCESS) { - break; + else { + ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r, + "error creating websosckets tunnel"); + return HTTP_INTERNAL_SERVER_ERROR; } } - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, - "finished with poll() - cleaning up"); - - if (client_error) { - return HTTP_INTERNAL_SERVER_ERROR; - } - return OK; -} - + return status; +} + /* */ static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker, @@ -378,7 +463,9 @@ static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker, } /* Do not close the socket */ - ap_proxy_release_connection(scheme, backend, r->server); + if (status != SUSPENDED) { + ap_proxy_release_connection(scheme, backend, r->server); + } return status; } diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index f013cf5ec1..004f523099 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -295,6 +295,7 @@ typedef enum #if HAVE_SERF , PT_SERF #endif + , PT_USER } poll_type_e; typedef struct @@ -303,6 +304,15 @@ typedef struct void *baton; } listener_poll_type; +typedef struct +{ + ap_mpm_callback_fn_t *cbfunc; + void *user_baton; + apr_pollfd_t **pfds; + int nsock; + int signaled; +} socket_callback_baton_t; + /* data retained by event across load/unload of the module * allocated on first call to pre-config hook; located on * subsequent calls to pre-config hook @@ -1340,6 +1350,71 @@ static apr_status_t event_register_timed_callback(apr_time_t t, return APR_SUCCESS; } + +static apr_status_t event_register_socket_callback(apr_socket_t **s, + apr_pool_t *p, + int for_read, + ap_mpm_callback_fn_t *cbfn, + void *baton) +{ + apr_status_t rc, final_rc= APR_SUCCESS; + int i = 0, nsock; + socket_callback_baton_t *scb = apr_pcalloc(p, sizeof(*scb)); + listener_poll_type *pt = apr_palloc(p, sizeof(*pt)); + apr_pollfd_t **pfds = NULL; + + while(s[i] != NULL) { + i++; + } + nsock = i; + pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*)); + + pt->type = PT_USER; + pt->baton = scb; + + scb->cbfunc = cbfn; + scb->user_baton = baton; + scb->nsock = nsock; + scb->pfds = pfds; + + for (i = 0; idesc_type = APR_POLL_SOCKET; + pfds[i]->reqevents = (for_read ? APR_POLLIN : APR_POLLOUT) | APR_POLLERR | APR_POLLHUP; + pfds[i]->desc.s = s[i]; + pfds[i]->client_data = pt; + rc = apr_pollset_add(event_pollset, pfds[i]); + if (rc != APR_SUCCESS) final_rc = rc; + } + return final_rc; +} +static apr_status_t event_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p) +{ + int i = 0, nsock; + apr_status_t final_rc = APR_SUCCESS; + apr_pollfd_t *pfd = apr_palloc(p, sizeof(*pfd)); + apr_pollfd_t **pfds = NULL; + + while(s[i] != NULL) { + i++; + } + nsock = i; + + pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*)); + + for (i = 0; idesc_type = APR_POLL_SOCKET; + pfds[i]->reqevents = APR_POLLERR | APR_POLLHUP; + pfds[i]->desc.s = s[i]; + pfds[i]->client_data = NULL; + rc = apr_pollset_remove(event_pollset, pfds[i]); + if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) final_rc = APR_SUCCESS; + } + return final_rc; +} + /* * Close socket and clean up if remote closed its end while we were in * lingering close. @@ -1722,7 +1797,40 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* XXXX: this doesn't require get_worker() */ serf_event_trigger(g_serf, pt->baton, out_pfd); } + #endif + else if (pt->type == PT_USER) { + /* masquerade as a timer event that is firing */ + timer_event_t *te; + int i = 0; + socket_callback_baton_t *baton = (socket_callback_baton_t *) pt->baton; + + if (!baton->signaled) { + baton->signaled = 1; + apr_thread_mutex_lock(g_timer_skiplist_mtx); + + if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) { + te = APR_RING_FIRST(&timer_free_ring); + APR_RING_REMOVE(te, link); + } + else { + te = ap_skiplist_alloc(timer_skiplist, sizeof(timer_event_t)); + APR_RING_ELEM_INIT(te, link); + } + apr_thread_mutex_unlock(g_timer_skiplist_mtx); + + for (i = 0; i < baton->nsock ; i++) { + apr_pollset_remove(event_pollset, baton->pfds[i]); + } + + te->cbfunc = baton->cbfunc; + te->baton = baton->user_baton; + te->when = -1; + + push_timer2worker(te); + } + apr_pollset_remove(event_pollset, out_pfd); + } out_pfd++; num--; } /* while for processing poll */ @@ -3246,6 +3354,11 @@ static void event_hooks(apr_pool_t * p) ap_hook_mpm_query(event_query, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_register_timed_callback(event_register_timed_callback, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_mpm_register_socket_callback(event_register_socket_callback, NULL, NULL, + APR_HOOK_MIDDLE); + ap_hook_mpm_unregister_socket_callback(event_unregister_socket_callback, NULL, NULL, + APR_HOOK_MIDDLE); + ap_hook_mpm_get_name(event_get_name, NULL, NULL, APR_HOOK_MIDDLE); } diff --git a/server/mpm_common.c b/server/mpm_common.c index e33bd11c9e..71e611356f 100644 --- a/server/mpm_common.c +++ b/server/mpm_common.c @@ -70,6 +70,8 @@ APR_HOOK_STRUCT( APR_HOOK_LINK(mpm) APR_HOOK_LINK(mpm_query) APR_HOOK_LINK(mpm_register_timed_callback) + APR_HOOK_LINK(mpm_register_socket_callback) + APR_HOOK_LINK(mpm_unregister_socket_callback) APR_HOOK_LINK(mpm_get_name) APR_HOOK_LINK(end_generation) APR_HOOK_LINK(child_status) @@ -83,6 +85,8 @@ APR_HOOK_STRUCT( APR_HOOK_LINK(mpm) APR_HOOK_LINK(mpm_query) APR_HOOK_LINK(mpm_register_timed_callback) + APR_HOOK_LINK(mpm_register_socket_callback) + APR_HOOK_LINK(mpm_unregister_socket_callback) APR_HOOK_LINK(mpm_get_name) APR_HOOK_LINK(end_generation) APR_HOOK_LINK(child_status) @@ -102,6 +106,13 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(int, mpm_query, AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_timed_callback, (apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton), (t, cbfn, baton), APR_ENOTIMPL) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback, + (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton), + (s, p, for_read, cbfn, baton), APR_ENOTIMPL) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_socket_callback, + (apr_socket_t **s, apr_pool_t *p), + (s, p), APR_ENOTIMPL) + AP_IMPLEMENT_HOOK_VOID(end_generation, (server_rec *s, ap_generation_t gen), (s, gen)) @@ -532,6 +543,14 @@ AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback(apr_time_t t, ap_mpm_cal { return ap_run_mpm_register_timed_callback(t, cbfn, baton); } +AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton) +{ + return ap_run_mpm_register_socket_callback(s, p, for_read, cbfn, baton); +} +AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p) +{ + return ap_run_mpm_unregister_socket_callback(s, p); +} AP_DECLARE(const char *)ap_show_mpm(void) { -- 2.40.0