]> granicus.if.org Git - apache/commitdiff
If serf is available, compile in driving the serf event loop from inside the
authorPaul Querna <pquerna@apache.org>
Sat, 28 Mar 2009 01:00:41 +0000 (01:00 +0000)
committerPaul Querna <pquerna@apache.org>
Sat, 28 Mar 2009 01:00:41 +0000 (01:00 +0000)
Event MPM.

Add a new MPM Query to determine if an MPM supports this.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@759413 13f79535-47bb-0310-9956-ffa450edef68

include/ap_mpm.h
server/mpm/event/event.c

index 43d341ea2caf5e7cb31c6e5f42315efd4f667755..15c37c906f6db4b105ddcf4e16b6a2dcdf648f6f 100644 (file)
@@ -144,6 +144,7 @@ AP_DECLARE(apr_status_t) ap_os_create_privileged_process(
 #define AP_MPMQ_MPM_STATE            13  /* starting, running, stopping  */
 #define AP_MPMQ_IS_ASYNC             14  /* MPM can process async connections  */
 #define AP_MPMQ_GENERATION           15  /* MPM generation */
+#define AP_MPMQ_HAS_SERF             16  /* MPM can drive serf internally  */
 
 /**
  * Query a property of the current MPM.  
index f6b409d750eb1d222eaf727ffc31333bcbb76f6e..71c7bd90fb5de3fcdc4e5b5fafe13a0f6543ab43 100644 (file)
 #include <signal.h>
 #include <limits.h>             /* for INT_MAX */
 
+#include "mod_serf.h"
+
+#if AP_HAS_SERF
+#include "serf.h"
+#endif
+
 /* Limit on the total --- clients will be locked out if more servers than
  * this are needed.  It is intended solely to keep the server from crashing
  * when things get out of hand.
@@ -168,6 +174,15 @@ static struct timeout_head_t timeout_head, keepalive_timeout_head;
 
 static apr_pollset_t *event_pollset;
 
+#if AP_HAS_SERF
+typedef struct {
+    apr_pollset_t *pollset;
+    apr_pool_t *pool;
+} s_baton_t;
+
+static serf_context_t *g_serf;
+#endif
+
 /* The structure used to pass unique initialization info to each thread */
 typedef struct
 {
@@ -191,6 +206,9 @@ typedef enum
 {
     PT_CSD,
     PT_ACCEPT
+#if AP_HAS_SERF
+    , PT_SERF
+#endif
 } poll_type_e;
 
 typedef struct
@@ -346,6 +364,9 @@ static apr_status_t event_query(int query_code, int *result)
     case AP_MPMQ_IS_ASYNC:
         *result = 1;
         return APR_SUCCESS;
+    case AP_MPMQ_HAS_SERF:
+        *result = 1;
+        return APR_SUCCESS;
     case AP_MPMQ_HARD_LIMIT_DAEMONS:
         *result = server_limit;
         return APR_SUCCESS;
@@ -782,8 +803,37 @@ static void dummy_signal_handler(int sig)
      */
 }
 
+
+#if AP_HAS_SERF
+static apr_status_t s_socket_add(void *user_baton,
+                                 apr_pollfd_t *pfd,
+                                 void *serf_baton)
+{
+    s_baton_t *s = (s_baton_t*)user_baton;
+    /* XXXXX: recycle listener_poll_types */
+    listener_poll_type *pt = malloc(sizeof(*pt));
+    pt->type = PT_SERF;
+    pt->baton = serf_baton;
+    pfd->client_data = pt;
+    return apr_pollset_add(s->pollset, pfd);
+}
+
+static apr_status_t s_socket_remove(void *user_baton,
+                                    apr_pollfd_t *pfd,
+                                    void *serf_baton)
+{
+    s_baton_t *s = (s_baton_t*)user_baton;
+    listener_poll_type *pt = pfd->client_data;
+    free(pt);
+    return apr_pollset_remove(s->pollset, pfd);
+}
+#endif
+
 static apr_status_t init_pollset(apr_pool_t *p)
 {
+#if AP_HAS_SERF
+    s_baton_t *baton = NULL;
+#endif
     apr_status_t rv;
     ap_listen_rec *lr;
     listener_poll_type *pt;
@@ -826,6 +876,21 @@ static apr_status_t init_pollset(apr_pool_t *p)
         lr->accept_func = ap_unixd_accept;
     }
 
+#if AP_HAS_SERF
+    baton = apr_pcalloc(p, sizeof(*baton));
+    baton->pollset = event_pollset;
+    /* TODO: subpools, threads, reuse, etc.  -- currently use malloc() inside :( */
+    baton->pool = p;
+
+    g_serf = serf_context_create_ex(baton,
+                                    s_socket_add,
+                                    s_socket_remove, p);
+
+    ap_register_provider(p, "mpm_serf",
+                         "instance", "0", g_serf);
+
+#endif
+
     return APR_SUCCESS;
 }
 
@@ -1042,6 +1107,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             apr_thread_mutex_unlock(g_timer_ring_mtx);
         }
 
+#if AP_HAS_SERF
+        rc = serf_context_prerun(g_serf);
+        if (rc != APR_SUCCESS) {
+            /* TOOD: what should do here? ugh. */
+        }
+        
+#endif
         rc = apr_pollset_poll(event_pollset, timeout_interval, &num,
                               &out_pfd);
 
@@ -1112,7 +1184,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                     have_idle_worker = 0;
                 }
             }
-            else {
+            else if (pt->type == PT_ACCEPT) {
                 /* A Listener Socket is ready for an accept() */
 
                 lr = (ap_listen_rec *) pt->baton;
@@ -1136,7 +1208,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                         return NULL;
                     }
                 }
-
                 apr_pool_tag(ptrans, "transaction");
 
                 rc = lr->accept_func(&csd, lr, ptrans);
@@ -1174,6 +1245,13 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                     ap_push_pool(worker_queue_info, ptrans);
                 }
             }               /* if:else on pt->type */
+#if AP_HAS_SERF
+            else if (pt->type == PT_SERF) {
+                /* send socket to serf. */
+                /* XXXX: this doesn't require get_worker(&have_idle_worker) */
+                serf_event_trigger(g_serf, pt->baton, out_pfd);
+            }
+#endif
             out_pfd++;
             num--;
         }                   /* while for processing poll */