]> granicus.if.org Git - apache/commitdiff
Fold in Eric Covener's socket callback
authorJim Jagielski <jim@apache.org>
Mon, 17 Jun 2013 12:43:52 +0000 (12:43 +0000)
committerJim Jagielski <jim@apache.org>
Mon, 17 Jun 2013 12:43:52 +0000 (12:43 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1493741 13f79535-47bb-0310-9956-ffa450edef68

include/ap_mpm.h
include/mpm_common.h
modules/proxy/mod_proxy_wstunnel.c
server/mpm/event/event.c
server/mpm_common.c

index 71f8f47caa23d8630e34d807ee71eefebda49653..6b345ed6ae4f845563380ee3998b0f180edfe143 100644 (file)
@@ -167,6 +167,29 @@ AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback(apr_time_t t,
                                                        ap_mpm_callback_fn_t *cbfn,
                                                        void *baton);
 
+/**
+ * Register a callback on the readability or writability on a group of sockets
+ * @param s Null-terminated list of sockets
+ * @param p pool for use between registration and callback
+ * @param for_read Whether the sockets are monitored for read or writability
+ * @param cbfn The callback function
+ * @param baton userdata for the callback function
+ * @return APR_SUCCESS if all sockets could be added to a pollset, 
+ * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error.
+ * @remark When activity is found on any 1 socket in the list, all are removed 
+ * from the pollset and only 1 callback is issued.
+ * @fn apr_status_t (p_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton)
+ */
+
+AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s,
+                                                         apr_pool_t *p,
+                                                         int for_read, 
+                                                         ap_mpm_callback_fn_t *cbfn,
+                                                         void *baton);
+
+AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, 
+                                                           apr_pool_t *p);
+
 typedef enum mpm_child_status {
     MPM_CHILD_STARTED,
     MPM_CHILD_EXITED,
index c555b72c2f9ad061adcf319596eaf662feadd6c7..8ba20d2ce23887c96e4e3a504ce04287e141c017 100644 (file)
@@ -409,6 +409,13 @@ AP_DECLARE_HOOK(int, mpm_query, (int query_code, int *result, apr_status_t *rv))
 AP_DECLARE_HOOK(apr_status_t, mpm_register_timed_callback,
                 (apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton))
 
+/* register the specified callback */
+AP_DECLARE_HOOK(apr_status_t, mpm_register_socket_callback,
+                (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton))
+/* unregister the specified callback */
+AP_DECLARE_HOOK(apr_status_t, mpm_unregister_socket_callback,
+                (apr_socket_t **s, apr_pool_t *p))
+
 /* get MPM name (e.g., "prefork" or "event") */
 AP_DECLARE_HOOK(const char *,mpm_get_name,(void))
 
index 365a20549e7957989b685aacd942225ee66a02a5..e0fbc142a8fa4c5c3968b5cce08e08f0981e9210 100644 (file)
  */
 
 #include "mod_proxy.h"
+#include "ap_mpm.h"
 
 module AP_MODULE_DECLARE_DATA proxy_wstunnel_module;
 
+typedef struct ws_baton_t {
+    request_rec *r;
+    proxy_conn_rec *proxy_connrec;
+    apr_socket_t *server_soc;
+    apr_socket_t *client_soc;
+    apr_pollset_t *pollset;
+    apr_bucket_brigade *bb;
+    int is_client;
+    apr_pool_t *subpool;
+} ws_baton_t;
+
+static int proxy_wstunnel_transfer(request_rec *r, conn_rec *c_i, conn_rec *c_o,
+                                     apr_bucket_brigade *bb, char *name);
+
+static int proxy_wstunnel_pump(ws_baton_t *baton, apr_time_t timeout) {
+    int client_error = 0;
+    request_rec *r = baton->r;
+    conn_rec *c = r->connection;
+    proxy_conn_rec *conn = baton->proxy_connrec;
+    apr_socket_t *sock = conn->sock;
+    conn_rec *backconn = conn->connection;
+    const apr_pollfd_t *signalled;
+    apr_int32_t pollcnt, pi;
+    apr_int16_t pollevent;
+    apr_pollset_t *pollset = baton->pollset;
+    apr_socket_t *client_socket = baton->client_soc;
+    apr_status_t rv;
+    apr_bucket_brigade *bb = baton->bb;
+
+    while(1) { 
+        if ((rv = apr_pollset_poll(pollset, timeout, &pollcnt, &signalled))
+                != APR_SUCCESS) {
+            if (APR_STATUS_IS_EINTR(rv)) {
+                continue;
+            }
+            else if (APR_STATUS_IS_TIMEUP(rv)) { 
+                ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Attempting to go asynch");
+                return SUSPENDED;
+            }
+            else { 
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02444) "error apr_poll()");
+                return HTTP_INTERNAL_SERVER_ERROR;
+            }
+        }
+
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02445)
+                "woke from poll(), i=%d", pollcnt);
+
+        for (pi = 0; pi < pollcnt; pi++) {
+            const apr_pollfd_t *cur = &signalled[pi];
+
+            if (cur->desc.s == sock) {
+                pollevent = cur->rtnevents;
+                if (pollevent & APR_POLLIN) {
+                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02446)
+                            "sock was readable");
+                    rv = proxy_wstunnel_transfer(r, backconn, c, bb, "sock");
+                }
+                else if ((pollevent & APR_POLLERR)
+                        || (pollevent & APR_POLLHUP)) {
+                    rv = APR_EPIPE;
+                    ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02447)
+                            "err/hup on backconn");
+                }
+                if (rv != APR_SUCCESS)
+                    client_error = 1;
+            }
+            else if (cur->desc.s == client_socket) {
+                pollevent = cur->rtnevents;
+                if (pollevent & APR_POLLIN) {
+                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02448)
+                            "client was readable");
+                    rv = proxy_wstunnel_transfer(r, c, backconn, bb, "client");
+                }
+            }
+            else {
+                rv = APR_EBADF;
+                ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(02449)
+                        "unknown socket in pollset");
+            }
+
+        }
+        if (rv != APR_SUCCESS) {
+            break;
+        }
+
+    }
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+            "finished with poll() - cleaning up");
+
+    if (client_error) {
+        return HTTP_INTERNAL_SERVER_ERROR;
+    }
+    return OK;
+}
+
+static void proxy_wstunnel_callback(void *b) { 
+    int status;
+    ws_baton_t *baton = (ws_baton_t*)b;
+    apr_thread_mutex_lock(baton->r->invoke_mtx);
+    apr_pool_clear(baton->subpool);
+    status = proxy_wstunnel_pump(baton, apr_time_from_sec(5));
+    if (status == SUSPENDED) {
+        apr_socket_t *sockets[3] = {baton->client_soc,  baton->server_soc, NULL};
+        ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton);
+    }
+    else {
+        apr_socket_t *sockets[3] = {baton->client_soc,  baton->server_soc, NULL};
+        ap_mpm_unregister_socket_callback(sockets, baton->subpool);
+        apr_thread_mutex_unlock(baton->r->invoke_mtx);
+        ap_finalize_request_protocol(baton->r);
+        ap_process_request_after_handler(baton->r);
+        return;
+    }
+    apr_thread_mutex_unlock(baton->r->invoke_mtx);
+}
+
 /*
  * Canonicalise http-like URLs.
  * scheme is the scheme for the URL
@@ -172,13 +290,9 @@ static int ap_proxy_wstunnel_request(apr_pool_t *p, request_rec *r,
     apr_status_t rv = APR_SUCCESS;
     apr_pollset_t *pollset;
     apr_pollfd_t pollfd;
-    const apr_pollfd_t *signalled;
-    apr_int32_t pollcnt, pi;
-    apr_int16_t pollevent;
     conn_rec *c = r->connection;
     apr_socket_t *sock = conn->sock;
     conn_rec *backconn = conn->connection;
-    int client_error = 0;
     char *buf;
     apr_bucket_brigade *header_brigade;
     apr_bucket *e;
@@ -186,6 +300,8 @@ static int ap_proxy_wstunnel_request(apr_pool_t *p, request_rec *r,
     char *old_te_val = NULL;
     apr_bucket_brigade *bb = apr_brigade_create(p, c->bucket_alloc);
     apr_socket_t *client_socket = ap_get_conn_socket(c);
+    ws_baton_t *baton = apr_pcalloc(r->pool, sizeof(ws_baton_t));
+    int status;
 
     header_brigade = apr_brigade_create(p, backconn->bucket_alloc);
 
@@ -240,66 +356,35 @@ static int ap_proxy_wstunnel_request(apr_pool_t *p, request_rec *r,
 
     remove_reqtimeout(r->input_filters);
 
-    while (1) { /* Infinite loop until error (one side closes the connection) */
-        if ((rv = apr_pollset_poll(pollset, -1, &pollcnt, &signalled))
-            != APR_SUCCESS) {
-            if (APR_STATUS_IS_EINTR(rv)) {
-                continue;
-            }
-            ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02444) "error apr_poll()");
-            return HTTP_INTERNAL_SERVER_ERROR;
+    baton->r = r;
+    baton->pollset = pollset;
+    baton->client_soc = client_socket;
+    baton->server_soc = sock;
+    baton->proxy_connrec = conn;
+    baton->bb = bb;
+    apr_pool_create(&baton->subpool, r->pool);
+
+    status = proxy_wstunnel_pump(baton, apr_time_from_sec(5)); 
+    if (status == SUSPENDED) {
+        apr_socket_t *sockets[3] = {baton->client_soc,  baton->server_soc, NULL};
+        status = ap_mpm_register_socket_callback(sockets, baton->subpool, 1, proxy_wstunnel_callback, baton);
+        if (status == APR_SUCCESS) { 
+            return SUSPENDED;
         }
-        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02445)
-                      "woke from poll(), i=%d", pollcnt);
-
-        for (pi = 0; pi < pollcnt; pi++) {
-            const apr_pollfd_t *cur = &signalled[pi];
-
-            if (cur->desc.s == sock) {
-                pollevent = cur->rtnevents;
-                if (pollevent & APR_POLLIN) {
-                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02446)
-                                  "sock was readable");
-                    rv = proxy_wstunnel_transfer(r, backconn, c, bb, "sock");
-                    }
-                else if ((pollevent & APR_POLLERR)
-                         || (pollevent & APR_POLLHUP)) {
-                         rv = APR_EPIPE;
-                         ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, APLOGNO(02447)
-                                       "err/hup on backconn");
-                }
-                if (rv != APR_SUCCESS)
-                    client_error = 1;
-            }
-            else if (cur->desc.s == client_socket) {
-                pollevent = cur->rtnevents;
-                if (pollevent & APR_POLLIN) {
-                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(02448)
-                                  "client was readable");
-                    rv = proxy_wstunnel_transfer(r, c, backconn, bb, "client");
-                }
-            }
-            else {
-                rv = APR_EBADF;
-                ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, APLOGNO(02449)
-                              "unknown socket in pollset");
-            }
-
+        else if (status == APR_ENOTIMPL) { 
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "No asynch support");
+            status = proxy_wstunnel_pump(baton, -1);
         }
-        if (rv != APR_SUCCESS) {
-            break;
+        else { 
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, status, r,
+                          "error creating websosckets tunnel");
+            return HTTP_INTERNAL_SERVER_ERROR;
         }
     }
 
-    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
-                  "finished with poll() - cleaning up");
-
-    if (client_error) {
-        return HTTP_INTERNAL_SERVER_ERROR;
-    }
-    return OK;
-}
-
+    return status;
+}    
+    
 /*
  */
 static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker,
@@ -378,7 +463,9 @@ static int proxy_wstunnel_handler(request_rec *r, proxy_worker *worker,
     }
 
     /* Do not close the socket */
-    ap_proxy_release_connection(scheme, backend, r->server);
+    if (status != SUSPENDED) { 
+        ap_proxy_release_connection(scheme, backend, r->server);
+    }
     return status;
 }
 
index f013cf5ec196e62b2fac4de04bd839f73db1fdfb..004f52309944ab7293009913bc7942f1fcd61e15 100644 (file)
@@ -295,6 +295,7 @@ typedef enum
 #if HAVE_SERF
     , PT_SERF
 #endif
+    , PT_USER
 } poll_type_e;
 
 typedef struct
@@ -303,6 +304,15 @@ typedef struct
     void *baton;
 } listener_poll_type;
 
+typedef struct
+{
+ ap_mpm_callback_fn_t *cbfunc;
+ void *user_baton; 
+ apr_pollfd_t **pfds;
+ int nsock;
+ int signaled;
+} socket_callback_baton_t;
+
 /* data retained by event across load/unload of the module
  * allocated on first call to pre-config hook; located on
  * subsequent calls to pre-config hook
@@ -1340,6 +1350,71 @@ static apr_status_t event_register_timed_callback(apr_time_t t,
     return APR_SUCCESS;
 }
 
+
+static apr_status_t event_register_socket_callback(apr_socket_t **s, 
+                                                  apr_pool_t *p, 
+                                                  int for_read,
+                                                  ap_mpm_callback_fn_t *cbfn,
+                                                  void *baton)
+{
+    apr_status_t rc, final_rc= APR_SUCCESS;
+    int i = 0, nsock;
+    socket_callback_baton_t *scb = apr_pcalloc(p, sizeof(*scb));
+    listener_poll_type *pt = apr_palloc(p, sizeof(*pt));
+    apr_pollfd_t **pfds = NULL;
+
+    while(s[i] != NULL) { 
+        i++; 
+    }
+    nsock = i;
+    pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*));
+
+    pt->type = PT_USER;
+    pt->baton = scb;
+
+    scb->cbfunc = cbfn;
+    scb->user_baton = baton;
+    scb->nsock = nsock;
+    scb->pfds = pfds;
+
+    for (i = 0; i<nsock; i++) { 
+        pfds[i] = apr_palloc(p, sizeof(apr_pollfd_t));
+        pfds[i]->desc_type = APR_POLL_SOCKET;
+        pfds[i]->reqevents = (for_read ? APR_POLLIN : APR_POLLOUT) | APR_POLLERR | APR_POLLHUP;
+        pfds[i]->desc.s = s[i];
+        pfds[i]->client_data = pt;
+        rc = apr_pollset_add(event_pollset, pfds[i]);
+        if (rc != APR_SUCCESS) final_rc = rc;
+    }
+    return final_rc;
+}
+static apr_status_t event_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p)
+{
+    int i = 0, nsock;
+    apr_status_t final_rc = APR_SUCCESS;
+    apr_pollfd_t *pfd = apr_palloc(p, sizeof(*pfd));
+    apr_pollfd_t **pfds = NULL;
+
+    while(s[i] != NULL) { 
+        i++; 
+    }
+    nsock = i;
+    pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*));
+
+    for (i = 0; i<nsock; i++) { 
+        apr_status_t rc;
+        pfds[i] = apr_pcalloc(p, sizeof(apr_pollfd_t));
+        pfds[i]->desc_type = APR_POLL_SOCKET;
+        pfds[i]->reqevents = APR_POLLERR | APR_POLLHUP;
+        pfds[i]->desc.s = s[i];
+        pfds[i]->client_data = NULL;
+        rc = apr_pollset_remove(event_pollset, pfds[i]);
+        if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) final_rc = APR_SUCCESS;
+    }
+    return final_rc;
+}
+
 /*
  * Close socket and clean up if remote closed its end while we were in
  * lingering close.
@@ -1722,7 +1797,40 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                 /* XXXX: this doesn't require get_worker() */
                 serf_event_trigger(g_serf, pt->baton, out_pfd);
             }
+
 #endif
+            else if (pt->type == PT_USER) {
+                /* masquerade as a timer event that is firing */
+                timer_event_t *te; 
+                int i = 0;
+                socket_callback_baton_t *baton = (socket_callback_baton_t *) pt->baton;
+
+                if (!baton->signaled) { 
+                    baton->signaled = 1;
+                    apr_thread_mutex_lock(g_timer_skiplist_mtx);
+
+                    if (!APR_RING_EMPTY(&timer_free_ring, timer_event_t, link)) {
+                        te = APR_RING_FIRST(&timer_free_ring);
+                        APR_RING_REMOVE(te, link);
+                    }
+                    else {
+                        te = ap_skiplist_alloc(timer_skiplist, sizeof(timer_event_t));
+                        APR_RING_ELEM_INIT(te, link);
+                    }
+                    apr_thread_mutex_unlock(g_timer_skiplist_mtx);
+
+                    for (i = 0; i < baton->nsock ; i++) { 
+                        apr_pollset_remove(event_pollset, baton->pfds[i]);
+                    }
+
+                    te->cbfunc = baton->cbfunc;
+                    te->baton = baton->user_baton;
+                    te->when = -1;
+
+                    push_timer2worker(te);
+                }
+                apr_pollset_remove(event_pollset, out_pfd);
+            }
             out_pfd++;
             num--;
         }                   /* while for processing poll */
@@ -3246,6 +3354,11 @@ static void event_hooks(apr_pool_t * p)
     ap_hook_mpm_query(event_query, NULL, NULL, APR_HOOK_MIDDLE);
     ap_hook_mpm_register_timed_callback(event_register_timed_callback, NULL, NULL,
                                         APR_HOOK_MIDDLE);
+    ap_hook_mpm_register_socket_callback(event_register_socket_callback, NULL, NULL,
+                                        APR_HOOK_MIDDLE);
+    ap_hook_mpm_unregister_socket_callback(event_unregister_socket_callback, NULL, NULL,
+                                        APR_HOOK_MIDDLE);
     ap_hook_mpm_get_name(event_get_name, NULL, NULL, APR_HOOK_MIDDLE);
 }
 
index e33bd11c9eb88637b79ba3aca2f8edad59999ef5..71e611356f1c285a5fb7c45ee3bd9065499aa5ad 100644 (file)
@@ -70,6 +70,8 @@ APR_HOOK_STRUCT(
     APR_HOOK_LINK(mpm)
     APR_HOOK_LINK(mpm_query)
     APR_HOOK_LINK(mpm_register_timed_callback)
+    APR_HOOK_LINK(mpm_register_socket_callback)
+    APR_HOOK_LINK(mpm_unregister_socket_callback)
     APR_HOOK_LINK(mpm_get_name)
     APR_HOOK_LINK(end_generation)
     APR_HOOK_LINK(child_status)
@@ -83,6 +85,8 @@ APR_HOOK_STRUCT(
     APR_HOOK_LINK(mpm)
     APR_HOOK_LINK(mpm_query)
     APR_HOOK_LINK(mpm_register_timed_callback)
+    APR_HOOK_LINK(mpm_register_socket_callback)
+    APR_HOOK_LINK(mpm_unregister_socket_callback)
     APR_HOOK_LINK(mpm_get_name)
     APR_HOOK_LINK(end_generation)
     APR_HOOK_LINK(child_status)
@@ -102,6 +106,13 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(int, mpm_query,
 AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_timed_callback,
                             (apr_time_t t, ap_mpm_callback_fn_t *cbfn, void *baton),
                             (t, cbfn, baton), APR_ENOTIMPL)
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback,
+                            (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton),
+                            (s, p, for_read, cbfn, baton), APR_ENOTIMPL)
+AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_socket_callback,
+                            (apr_socket_t **s, apr_pool_t *p),
+                            (s, p), APR_ENOTIMPL)
+
 AP_IMPLEMENT_HOOK_VOID(end_generation,
                        (server_rec *s, ap_generation_t gen),
                        (s, gen))
@@ -532,6 +543,14 @@ AP_DECLARE(apr_status_t) ap_mpm_register_timed_callback(apr_time_t t, ap_mpm_cal
 {
     return ap_run_mpm_register_timed_callback(t, cbfn, baton);
 }
+AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton)
+{
+    return ap_run_mpm_register_socket_callback(s, p, for_read, cbfn, baton);
+}
+AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p)
+{
+    return ap_run_mpm_unregister_socket_callback(s, p);
+}
 
 AP_DECLARE(const char *)ap_show_mpm(void)
 {