]> granicus.if.org Git - apache/commitdiff
Work in Progress.
authorPaul Querna <pquerna@apache.org>
Fri, 27 Mar 2009 23:10:21 +0000 (23:10 +0000)
committerPaul Querna <pquerna@apache.org>
Fri, 27 Mar 2009 23:10:21 +0000 (23:10 +0000)
Add Clustered proxying support to mod_serf, by using the heartbeats system.

No preconfiguration of cluster members is needed.

Just a config like this:
    SerfCluster sweet heartbeat file=/var/cache/apache/hb.dat
    SerfCluster sour heartbeat file=/var/cache/apache/cluster2.dat
    <Location "/">
      SerfPass cluster://sweet
    </Location>
    <Location "/different_cluster">
      SerfPass cluster://sour
    </Location>

The location of all possible destination servers is provided by a new
providers interface, that includes configuration checking of the arguments to
the SerfCluster command, solving one of the worst problems with the mod_proxy
load balancer subsystem.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@759386 13f79535-47bb-0310-9956-ffa450edef68

modules/proxy/mod_serf.c
modules/proxy/mod_serf.h [new file with mode: 0644]

index 7dfabb9ac033d6d794a6b5cdd1448837eb02c8fd..c790f6b8cec5a4299318a99553cdddf967267a8b 100644 (file)
@@ -14,6 +14,8 @@
  * limitations under the License.
  */
 
+#include "mod_serf.h"
+
 #include "httpd.h"
 #include "http_core.h"
 #include "http_config.h"
 
 #include "serf.h"
 #include "apr_uri.h"
+#include "apr_strings.h"
+
 
 module AP_MODULE_DECLARE_DATA serf_module;
 
 typedef struct {
     int on;
     apr_uri_t url;
-} serf_config_rec;
+} serf_config_t;
+
+typedef struct {
+  const char *name;
+  const char *provider;
+  apr_table_t *params;
+} serf_cluster_t;
+
+typedef struct {
+  /* name -> serf_cluster_t* */
+  apr_hash_t *clusters;
+} serf_server_config_t;
 
 typedef struct {
     int rstatus;
@@ -36,7 +51,7 @@ typedef struct {
     int done_headers;
     int keep_reading;
     request_rec *r;
-    serf_config_rec *conf;
+    serf_config_t *conf;
     serf_ssl_context_t *ssl_ctx;
     serf_bucket_alloc_t *bkt_alloc;
 } s_baton_t;
@@ -73,7 +88,7 @@ static serf_bucket_t* conn_setup(apr_socket_t *sock,
     return c;
 }
 
-int copy_headers_in(void *vbaton, const char *key, const char *value)
+static int copy_headers_in(void *vbaton, const char *key, const char *value)
 {
     serf_bucket_t *hdrs_bkt = (serf_bucket_t *)vbaton;
 
@@ -129,7 +144,7 @@ int copy_headers_in(void *vbaton, const char *key, const char *value)
     return 0;
 }
 
-int copy_headers_out(void *vbaton, const char *key, const char *value)
+static int copy_headers_out(void *vbaton, const char *key, const char *value)
 {
     s_baton_t *ctx = vbaton;
     int done = 0;
@@ -304,7 +319,8 @@ static apr_status_t setup_request(serf_request_t *request,
     return APR_SUCCESS;
 }
 
-static int drive_serf(request_rec *r, serf_config_rec *conf)
+/* TOOD: rewrite drive_serf to make it async */
+static int drive_serf(request_rec *r, serf_config_t *conf)
 {
     apr_status_t rv;
     apr_pool_t *pool = r->pool;
@@ -314,11 +330,73 @@ static int drive_serf(request_rec *r, serf_config_rec *conf)
     serf_context_t *serfme;
     serf_connection_t *conn;
     serf_request_t *srequest;
+    serf_server_config_t *ctx = 
+        (serf_server_config_t *)ap_get_module_config(r->server->module_config,
+                                                     &serf_module);
+    
+    if (strcmp(conf->url.scheme, "cluster") == 0) {
+        int rc;
+        ap_serf_cluster_provider_t *cp;
+        serf_cluster_t *cluster;
+        apr_array_header_t *servers = NULL;
+        ap_serf_server_t *choice;
+
+        /* TODO: could this be optimized in post-config to pre-setup the 
+         * pointers to the right cluster inside the conf structure?
+         */
+        cluster = apr_hash_get(ctx->clusters,
+                               conf->url.hostname,
+                               APR_HASH_KEY_STRING);
+        if (!cluster) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+                          "SerfCluster: unable to find cluster %s", conf->url.hostname);
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
+
+        cp = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
+        
+        if (cp == NULL) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+                          "SerfCluster: unable to find provider %s", cluster->provider);
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
+
+        if (cp->list_servers == NULL) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+                          "SerfCluster: %s is missing list servers provider.", cluster->provider);
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
+        
+        rc = cp->list_servers(cp->baton,
+                              r,
+                              cluster->params,
+                              &servers);
+
+        if (rc != OK) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r,
+                          "SerfCluster: %s list servers returned failure", cluster->provider);
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
+
+        if (servers == NULL || apr_is_empty_array(servers)) {
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r,
+                          "SerfCluster: %s failed to provide a list of servers", cluster->provider);
+            return HTTP_INTERNAL_SERVER_ERROR;
+        }
 
-    /* XXXXX: cache dns? */
-    rv = apr_sockaddr_info_get(&address, conf->url.hostname,
-                               APR_UNSPEC, conf->url.port, 0,
-                               pool);
+        /* TOOD: restructure try all servers in the array !! */
+        choice = APR_ARRAY_IDX(servers, 0, ap_serf_server_t *);
+
+        rv = apr_sockaddr_info_get(&address, choice->ip,
+                                   APR_UNSPEC, choice->port, 0,
+                                   pool);
+    }
+    else {
+        /* XXXXX: cache dns? */
+        rv = apr_sockaddr_info_get(&address, conf->url.hostname,
+                                   APR_UNSPEC, conf->url.port, 0,
+                                   pool);
+    }
 
     if (rv != APR_SUCCESS) {
         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "Unable to resolve: %s", conf->url.hostname);
@@ -372,8 +450,8 @@ static int drive_serf(request_rec *r, serf_config_rec *conf)
 
 static int serf_handler(request_rec *r)
 {
-    serf_config_rec *conf = ap_get_module_config(r->per_dir_config,
-                                                 &serf_module);
+    serf_config_t *conf = ap_get_module_config(r->per_dir_config,
+                                               &serf_module);
 
     if (conf->on == 0) {
         return DECLINED;
@@ -386,7 +464,7 @@ static const char *add_pass(cmd_parms *cmd, void *vconf,
                             const char *vdest)
 {
     apr_status_t rv;
-    serf_config_rec *conf = (serf_config_rec *) vconf;
+    serf_config_t *conf = (serf_config_t *) vconf;
 
     rv = apr_uri_parse(cmd->pool, vdest, &conf->url);
 
@@ -408,32 +486,356 @@ static const char *add_pass(cmd_parms *cmd, void *vconf,
     return NULL;
 }
 
-static void *create_config(apr_pool_t *p, char *dummy)
+/* SerfCluster <name> <provider> <key=value_params_to_provider> ... */
+
+static const char *add_cluster(cmd_parms *cmd, void *d,
+                               int argc, char *const argv[])
 {
-    serf_config_rec *new = (serf_config_rec *) apr_pcalloc(p, sizeof(serf_config_rec));
+    const char *rv;
+    ap_serf_cluster_provider_t *backend;
+    int i;
+    serf_cluster_t *cluster = NULL;
+    serf_server_config_t *ctx = 
+        (serf_server_config_t *)ap_get_module_config(cmd->server->module_config,
+                                                     &serf_module);
+
+    const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+  
+    if (err != NULL) {
+        return err;
+    }
+
+    if (argc < 2) {
+        return "SerfCluster must have at least a name and provider.";
+    }
+    
+    cluster = apr_palloc(cmd->pool, sizeof(serf_cluster_t));
+    cluster->name = apr_pstrdup(cmd->pool, argv[0]);
+    cluster->provider = apr_pstrdup(cmd->pool, argv[1]);
+    cluster->params = apr_table_make(cmd->pool, 6);
+
+    backend = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
+    
+    if (backend == NULL) {
+        return apr_psprintf(cmd->pool, "SerfCluster: unable to find "
+                            "provider '%s'", cluster->provider);
+    }
+
+    for (i = 2; i < argc; i++) {
+        const char *p = argv[i];
+        const char *x = ap_strchr(p, '=');
+
+        if (x && strlen(p) > 1) {
+            apr_table_addn(cluster->params,
+                           apr_pstrndup(cmd->pool, p, x-p),
+                           x+1);
+        }
+        else {
+            apr_table_addn(cluster->params, 
+                           apr_pstrdup(cmd->pool, p),
+                           "");
+        }
+    }
+
+    if (backend->check_config == NULL) {
+        return apr_psprintf(cmd->pool, "SerfCluster: Provider '%s' failed to "
+                             "provider a configuration checker",
+                            cluster->provider);
+    }
+
+    rv = backend->check_config(backend->baton, cmd, cluster->params);
+    
+    if (rv) {
+        return rv;
+    }
+
+    apr_hash_set(ctx->clusters, cluster->name, APR_HASH_KEY_STRING, cluster);
+
+    return NULL;
+}
+
+static void *create_dir_config(apr_pool_t *p, char *dummy)
+{
+    serf_config_t *new = (serf_config_t *) apr_pcalloc(p, sizeof(serf_config_t));
     new->on = 0;
     return new;
 }
 
+static void *create_server_config(apr_pool_t *p, server_rec *s)
+{
+    serf_server_config_t *ctx = 
+        (serf_server_config_t *) apr_pcalloc(p, sizeof(serf_server_config_t));
+
+    ctx->clusters = apr_hash_make(p);
+
+    return ctx;
+}
+
+static void * merge_server_config(apr_pool_t *p, void *basev, void *overridesv)
+{
+    serf_server_config_t *ctx = apr_pcalloc(p, sizeof(serf_server_config_t));
+    serf_server_config_t *base = (serf_server_config_t *) basev;
+    serf_server_config_t *overrides = (serf_server_config_t *) overridesv;
+    
+    ctx->clusters = apr_hash_overlay(p, base->clusters, overrides->clusters);
+    return ctx;
+}    
+
 static const command_rec serf_cmds[] =
 {
-    AP_INIT_TAKE1("SerfPass", add_pass, NULL, OR_INDEXES/*making shit up*/,
-     "A prefix and destination"),
+    AP_INIT_TAKE_ARGV("SerfCluster", add_cluster, NULL, RSRC_CONF,
+                      "Configure a cluster backend"),
+    AP_INIT_TAKE1("SerfPass", add_pass, NULL, OR_INDEXES,
+                  "URL to reverse proxy to"),
     {NULL}
 };
 
+typedef struct hb_table_baton_t {
+    apr_pool_t *p;
+    const char *msg;
+} hb_table_baton_t;
+
+static int hb_table_check(void *rec, const char *key, const char *value)
+{
+    hb_table_baton_t *b = (hb_table_baton_t*)rec;
+    if (strcmp(key, "path") != 0) {
+        b->msg = apr_psprintf(b->p,
+                              "SerfCluster Heartbeat Invalid parameter '%s'",
+                              key);
+        return 1;
+    }
+
+    return 0;
+}
+
+static const char* hb_config_check(void *baton,
+                                   cmd_parms *cmd,
+                                   apr_table_t *params)
+{
+    hb_table_baton_t b;
+
+    if (apr_is_empty_table(params)) {
+        return "SerfCluster Heartbeat requires a path to the heartbat information.";
+    }
+    
+    b.p = cmd->pool;
+    b.msg = NULL;
+
+    apr_table_do(hb_table_check, &b, params, NULL);
+    
+    if (b.msg) {
+        return b.msg;
+    }
+
+    return NULL;
+}
+
+typedef struct hb_server_t {
+    const char *ip;
+    int busy;
+    int ready;
+    int seen;
+} hb_server_t;
+
+static void
+argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
+{
+    char *key;
+    char *value;
+    char *strtok_state;
+    
+    key = apr_strtok(str, "&", &strtok_state);
+    while (key) {
+        value = strchr(key, '=');
+        if (value) {
+            *value = '\0';      /* Split the string in two */
+            value++;            /* Skip passed the = */
+        }
+        else {
+            value = "1";
+        }
+        ap_unescape_url(key);
+        ap_unescape_url(value);
+        apr_table_set(parms, key, value);
+        key = apr_strtok(NULL, "&", &strtok_state);
+    }
+}
+
+static apr_status_t read_heartbeats(const char *path,
+                                    apr_array_header_t *servers,
+                                    apr_pool_t *pool)
+{
+    apr_finfo_t fi;
+    apr_status_t rv;
+    apr_file_t *fp;
+    
+    if (!path) {
+        return APR_SUCCESS;
+    }
+    
+    rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
+                       APR_OS_DEFAULT, pool);
+    
+    if (rv) {
+        return rv;
+    }
+    
+    rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
+    
+    if (rv) {
+        return rv;
+    }
+    
+    {
+        char *t;
+        int lineno = 0;
+        apr_table_t *hbt = apr_table_make(pool, 10);
+        char buf[4096];
+
+        while (apr_file_gets(buf, sizeof(buf), fp) == APR_SUCCESS) {
+            hb_server_t *server;
+            const char *ip;
+            lineno++;
+
+            /* comment */
+            if (buf[0] == '#') {
+                continue;
+            }
+            
+            
+            /* line format: <IP> <query_string>\n */
+            t = strchr(buf, ' ');
+            if (!t) {
+                continue;
+            }
+            
+            ip = apr_pstrndup(pool, buf, t - buf);
+            t++;
+            server = apr_pcalloc(pool, sizeof(hb_server_t));
+            server->ip = ip;
+            server->seen = -1;
+            apr_table_clear(hbt);
+            
+            argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
+            
+            if (apr_table_get(hbt, "busy")) {
+                server->busy = atoi(apr_table_get(hbt, "busy"));
+            }
+            
+            if (apr_table_get(hbt, "ready")) {
+                server->ready = atoi(apr_table_get(hbt, "ready"));
+            }
+            
+            if (apr_table_get(hbt, "lastseen")) {
+                server->seen = atoi(apr_table_get(hbt, "lastseen"));
+            }
+            
+            if (server->busy == 0 && server->ready != 0) {
+                /* Server has zero threads active, but lots of them ready, 
+                 * it likely just started up, so lets /4 the number ready, 
+                 * to prevent us from completely flooding it with all new 
+                 * requests.
+                 */
+                server->ready = server->ready / 4;
+            }
+
+            APR_ARRAY_PUSH(servers, hb_server_t *) = server;
+        }
+    }
+    
+    return APR_SUCCESS;
+}
+
+static int hb_server_sort(const void *a_, const void *b_)
+{
+    hb_server_t *a = (hb_server_t*)a;
+    hb_server_t *b = (hb_server_t*)b;
+    if (a->ready == b->ready) {
+        return 0;
+    }
+    else if (a->ready > b->ready) {
+        return -1;
+    }
+    else {
+        return 1;
+    }
+}
+
+static int hb_list_servers(void *baton,
+                           request_rec *r,
+                           apr_table_t *params,
+                           apr_array_header_t **out_servers)
+{
+    int i;
+    hb_server_t *hbs;
+    apr_status_t rv;
+    apr_pool_t *tpool;
+    apr_array_header_t *tmpservers;
+    apr_array_header_t *servers;
+    const char *path = apr_table_get(params, "path");
+
+    apr_pool_create(&tpool, r->pool);
+
+    tmpservers = apr_array_make(tpool, 32, sizeof(hb_server_t *));
+    rv = read_heartbeats(path, tmpservers, tpool);
+
+    if (rv) {
+        ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+                      "SerfCluster: Heartbeat unable to read '%s'", path);
+        apr_pool_destroy(tpool);
+        return HTTP_INTERNAL_SERVER_ERROR;
+    }
+    
+    qsort(tmpservers->elts, tmpservers->nelts, sizeof(hb_server_t *),
+          hb_server_sort);
+
+    servers = apr_array_make(r->pool, tmpservers->nelts, sizeof(ap_serf_server_t *));
+    for (i = 0;
+         i < tmpservers->nelts;
+         i++)
+    {
+        ap_serf_server_t *x;
+
+        hbs = APR_ARRAY_IDX(tmpservers, i, hb_server_t *);
+        if (hbs->ready > 0) {
+            x = apr_palloc(r->pool, sizeof(ap_serf_server_t));
+            x->ip = apr_pstrdup(r->pool, hbs->ip);
+            /* TODO: expand multicast format to support ports? */
+            x->port = 80;
+            APR_ARRAY_PUSH(servers, ap_serf_server_t *) = x;
+        }
+    }
+
+    *out_servers = servers;
+    apr_pool_destroy(tpool);
+    return OK;
+}
+
+static const ap_serf_cluster_provider_t builtin_heartbeat =
+{
+    "heartbeat",
+    NULL,
+    &hb_config_check,
+    &hb_list_servers,
+    NULL,
+    NULL
+};
+
 static void register_hooks(apr_pool_t *p)
 {
+    ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER,
+                         "heartbeat", "0", &builtin_heartbeat);
+
     ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST);
 }
 
 module AP_MODULE_DECLARE_DATA serf_module =
 {
     STANDARD20_MODULE_STUFF,
-    create_config,
-    NULL,
-    NULL,
+    create_dir_config,
     NULL,
+    create_server_config,
+    merge_server_config,
     serf_cmds,
     register_hooks
 };
diff --git a/modules/proxy/mod_serf.h b/modules/proxy/mod_serf.h
new file mode 100644 (file)
index 0000000..80f40c6
--- /dev/null
@@ -0,0 +1,102 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * @file mod_serf.h
+ * @brief Serf Interfaces
+ *
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "ap_provider.h"
+
+#ifndef _MOD_SERF_H_
+#define _MOD_SERF_H_
+/**
+ * @addtogroup Serf_cluster_provider
+ * @{
+ */
+#define AP_SERF_CLUSTER_PROVIDER "serf_cluster"
+typedef struct ap_serf_server_t ap_serf_server_t;
+struct ap_serf_server_t {
+    /* TOOD: consider using apr_sockaddr_t, except they suck. */
+    const char *ip;
+    apr_port_t port;
+};
+
+typedef struct ap_serf_cluster_provider_t ap_serf_cluster_provider_t;
+struct ap_serf_cluster_provider_t {
+    /**
+     * Human readable name of this provider, used in configuration.
+     */
+    const char *name;
+    /**
+     * Baton passed to all methods in this provider.
+     *
+     * This field may be NULL.
+     */
+    void *baton;
+    /**
+     * Check that the key/value pairs used to configure the 
+     * cluster are valid.
+     *
+     * Return non-NULL on failure with an error message, like standard httpd
+     * configuration directives.
+     *
+     * This field must be set.
+     */
+    const char* (*check_config)(void *baton,
+                                cmd_parms *cmd,
+                                apr_table_t *params);
+    /**
+     * Provide an ordered array of ap_serf_server_t in the order that
+     * mod_serf should attempt to use them.  If a server on the list
+     * is known to be not responding, it may be skipped.  If mod_serf is 
+     * unable to contact any of the servers, a 502 will be returned to the 
+     * client.
+     *
+     * Returns OK on sucess, all other return codes will result in a 500.
+     *
+     * This field must be set.
+     */
+    int (*list_servers)(void *baton,
+                        request_rec *r,
+                        apr_table_t *params,
+                        apr_array_header_t **servers);
+    /**
+     * If a request was successfully fulfilled by this address, feedback will
+     * be given to the provider, so it may make better recommendations.
+     *
+     * This field may be NULL.
+     */
+    void (*server_success)(void *baton, request_rec *r, apr_table_t *params,
+                           ap_serf_server_t* server);
+    /**
+     * If a request failed to be fulfilled by this address, feedback will
+     * be given to the provider, so it may make better recommendations.
+     *
+     * This field may be NULL.
+     */
+    void (*server_failure)(void *baton, request_rec *r, apr_table_t *params,
+                           ap_serf_server_t* server);
+
+};
+/** @} */
+
+#endif /* _MOD_SERF_H_ */
+