From: Paul Querna Date: Fri, 27 Mar 2009 23:10:21 +0000 (+0000) Subject: Work in Progress. X-Git-Tag: 2.3.3~781 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=3d379b20f574c9d14968da6a75f581c82a853e6d;p=apache Work in Progress. Add Clustered proxying support to mod_serf, by using the heartbeats system. No preconfiguration of cluster members is needed. Just a config like this: SerfCluster sweet heartbeat file=/var/cache/apache/hb.dat SerfCluster sour heartbeat file=/var/cache/apache/cluster2.dat SerfPass cluster://sweet SerfPass cluster://sour The location of all possible destination servers is provided by a new providers interface, that includes configuration checking of the arguments to the SerfCluster command, solving one of the worst problems with the mod_proxy load balancer subsystem. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@759386 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/proxy/mod_serf.c b/modules/proxy/mod_serf.c index 7dfabb9ac0..c790f6b8ce 100644 --- a/modules/proxy/mod_serf.c +++ b/modules/proxy/mod_serf.c @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "mod_serf.h" + #include "httpd.h" #include "http_core.h" #include "http_config.h" @@ -22,13 +24,26 @@ #include "serf.h" #include "apr_uri.h" +#include "apr_strings.h" + module AP_MODULE_DECLARE_DATA serf_module; typedef struct { int on; apr_uri_t url; -} serf_config_rec; +} serf_config_t; + +typedef struct { + const char *name; + const char *provider; + apr_table_t *params; +} serf_cluster_t; + +typedef struct { + /* name -> serf_cluster_t* */ + apr_hash_t *clusters; +} serf_server_config_t; typedef struct { int rstatus; @@ -36,7 +51,7 @@ typedef struct { int done_headers; int keep_reading; request_rec *r; - serf_config_rec *conf; + serf_config_t *conf; serf_ssl_context_t *ssl_ctx; serf_bucket_alloc_t *bkt_alloc; } s_baton_t; @@ -73,7 +88,7 @@ static serf_bucket_t* conn_setup(apr_socket_t *sock, return c; } -int copy_headers_in(void *vbaton, const char *key, const char *value) +static int copy_headers_in(void *vbaton, const char *key, const char *value) { serf_bucket_t *hdrs_bkt = (serf_bucket_t *)vbaton; @@ -129,7 +144,7 @@ int copy_headers_in(void *vbaton, const char *key, const char *value) return 0; } -int copy_headers_out(void *vbaton, const char *key, const char *value) +static int copy_headers_out(void *vbaton, const char *key, const char *value) { s_baton_t *ctx = vbaton; int done = 0; @@ -304,7 +319,8 @@ static apr_status_t setup_request(serf_request_t *request, return APR_SUCCESS; } -static int drive_serf(request_rec *r, serf_config_rec *conf) +/* TOOD: rewrite drive_serf to make it async */ +static int drive_serf(request_rec *r, serf_config_t *conf) { apr_status_t rv; apr_pool_t *pool = r->pool; @@ -314,11 +330,73 @@ static int drive_serf(request_rec *r, serf_config_rec *conf) serf_context_t *serfme; serf_connection_t *conn; serf_request_t *srequest; + serf_server_config_t *ctx = + (serf_server_config_t *)ap_get_module_config(r->server->module_config, + &serf_module); + + if (strcmp(conf->url.scheme, "cluster") == 0) { + int rc; + ap_serf_cluster_provider_t *cp; + serf_cluster_t *cluster; + apr_array_header_t *servers = NULL; + ap_serf_server_t *choice; + + /* TODO: could this be optimized in post-config to pre-setup the + * pointers to the right cluster inside the conf structure? + */ + cluster = apr_hash_get(ctx->clusters, + conf->url.hostname, + APR_HASH_KEY_STRING); + if (!cluster) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, + "SerfCluster: unable to find cluster %s", conf->url.hostname); + return HTTP_INTERNAL_SERVER_ERROR; + } + + cp = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0"); + + if (cp == NULL) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, + "SerfCluster: unable to find provider %s", cluster->provider); + return HTTP_INTERNAL_SERVER_ERROR; + } + + if (cp->list_servers == NULL) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, + "SerfCluster: %s is missing list servers provider.", cluster->provider); + return HTTP_INTERNAL_SERVER_ERROR; + } + + rc = cp->list_servers(cp->baton, + r, + cluster->params, + &servers); + + if (rc != OK) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r, + "SerfCluster: %s list servers returned failure", cluster->provider); + return HTTP_INTERNAL_SERVER_ERROR; + } + + if (servers == NULL || apr_is_empty_array(servers)) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r, + "SerfCluster: %s failed to provide a list of servers", cluster->provider); + return HTTP_INTERNAL_SERVER_ERROR; + } - /* XXXXX: cache dns? */ - rv = apr_sockaddr_info_get(&address, conf->url.hostname, - APR_UNSPEC, conf->url.port, 0, - pool); + /* TOOD: restructure try all servers in the array !! */ + choice = APR_ARRAY_IDX(servers, 0, ap_serf_server_t *); + + rv = apr_sockaddr_info_get(&address, choice->ip, + APR_UNSPEC, choice->port, 0, + pool); + } + else { + /* XXXXX: cache dns? */ + rv = apr_sockaddr_info_get(&address, conf->url.hostname, + APR_UNSPEC, conf->url.port, 0, + pool); + } if (rv != APR_SUCCESS) { ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "Unable to resolve: %s", conf->url.hostname); @@ -372,8 +450,8 @@ static int drive_serf(request_rec *r, serf_config_rec *conf) static int serf_handler(request_rec *r) { - serf_config_rec *conf = ap_get_module_config(r->per_dir_config, - &serf_module); + serf_config_t *conf = ap_get_module_config(r->per_dir_config, + &serf_module); if (conf->on == 0) { return DECLINED; @@ -386,7 +464,7 @@ static const char *add_pass(cmd_parms *cmd, void *vconf, const char *vdest) { apr_status_t rv; - serf_config_rec *conf = (serf_config_rec *) vconf; + serf_config_t *conf = (serf_config_t *) vconf; rv = apr_uri_parse(cmd->pool, vdest, &conf->url); @@ -408,32 +486,356 @@ static const char *add_pass(cmd_parms *cmd, void *vconf, return NULL; } -static void *create_config(apr_pool_t *p, char *dummy) +/* SerfCluster ... */ + +static const char *add_cluster(cmd_parms *cmd, void *d, + int argc, char *const argv[]) { - serf_config_rec *new = (serf_config_rec *) apr_pcalloc(p, sizeof(serf_config_rec)); + const char *rv; + ap_serf_cluster_provider_t *backend; + int i; + serf_cluster_t *cluster = NULL; + serf_server_config_t *ctx = + (serf_server_config_t *)ap_get_module_config(cmd->server->module_config, + &serf_module); + + const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY); + + if (err != NULL) { + return err; + } + + if (argc < 2) { + return "SerfCluster must have at least a name and provider."; + } + + cluster = apr_palloc(cmd->pool, sizeof(serf_cluster_t)); + cluster->name = apr_pstrdup(cmd->pool, argv[0]); + cluster->provider = apr_pstrdup(cmd->pool, argv[1]); + cluster->params = apr_table_make(cmd->pool, 6); + + backend = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0"); + + if (backend == NULL) { + return apr_psprintf(cmd->pool, "SerfCluster: unable to find " + "provider '%s'", cluster->provider); + } + + for (i = 2; i < argc; i++) { + const char *p = argv[i]; + const char *x = ap_strchr(p, '='); + + if (x && strlen(p) > 1) { + apr_table_addn(cluster->params, + apr_pstrndup(cmd->pool, p, x-p), + x+1); + } + else { + apr_table_addn(cluster->params, + apr_pstrdup(cmd->pool, p), + ""); + } + } + + if (backend->check_config == NULL) { + return apr_psprintf(cmd->pool, "SerfCluster: Provider '%s' failed to " + "provider a configuration checker", + cluster->provider); + } + + rv = backend->check_config(backend->baton, cmd, cluster->params); + + if (rv) { + return rv; + } + + apr_hash_set(ctx->clusters, cluster->name, APR_HASH_KEY_STRING, cluster); + + return NULL; +} + +static void *create_dir_config(apr_pool_t *p, char *dummy) +{ + serf_config_t *new = (serf_config_t *) apr_pcalloc(p, sizeof(serf_config_t)); new->on = 0; return new; } +static void *create_server_config(apr_pool_t *p, server_rec *s) +{ + serf_server_config_t *ctx = + (serf_server_config_t *) apr_pcalloc(p, sizeof(serf_server_config_t)); + + ctx->clusters = apr_hash_make(p); + + return ctx; +} + +static void * merge_server_config(apr_pool_t *p, void *basev, void *overridesv) +{ + serf_server_config_t *ctx = apr_pcalloc(p, sizeof(serf_server_config_t)); + serf_server_config_t *base = (serf_server_config_t *) basev; + serf_server_config_t *overrides = (serf_server_config_t *) overridesv; + + ctx->clusters = apr_hash_overlay(p, base->clusters, overrides->clusters); + return ctx; +} + static const command_rec serf_cmds[] = { - AP_INIT_TAKE1("SerfPass", add_pass, NULL, OR_INDEXES/*making shit up*/, - "A prefix and destination"), + AP_INIT_TAKE_ARGV("SerfCluster", add_cluster, NULL, RSRC_CONF, + "Configure a cluster backend"), + AP_INIT_TAKE1("SerfPass", add_pass, NULL, OR_INDEXES, + "URL to reverse proxy to"), {NULL} }; +typedef struct hb_table_baton_t { + apr_pool_t *p; + const char *msg; +} hb_table_baton_t; + +static int hb_table_check(void *rec, const char *key, const char *value) +{ + hb_table_baton_t *b = (hb_table_baton_t*)rec; + if (strcmp(key, "path") != 0) { + b->msg = apr_psprintf(b->p, + "SerfCluster Heartbeat Invalid parameter '%s'", + key); + return 1; + } + + return 0; +} + +static const char* hb_config_check(void *baton, + cmd_parms *cmd, + apr_table_t *params) +{ + hb_table_baton_t b; + + if (apr_is_empty_table(params)) { + return "SerfCluster Heartbeat requires a path to the heartbat information."; + } + + b.p = cmd->pool; + b.msg = NULL; + + apr_table_do(hb_table_check, &b, params, NULL); + + if (b.msg) { + return b.msg; + } + + return NULL; +} + +typedef struct hb_server_t { + const char *ip; + int busy; + int ready; + int seen; +} hb_server_t; + +static void +argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms) +{ + char *key; + char *value; + char *strtok_state; + + key = apr_strtok(str, "&", &strtok_state); + while (key) { + value = strchr(key, '='); + if (value) { + *value = '\0'; /* Split the string in two */ + value++; /* Skip passed the = */ + } + else { + value = "1"; + } + ap_unescape_url(key); + ap_unescape_url(value); + apr_table_set(parms, key, value); + key = apr_strtok(NULL, "&", &strtok_state); + } +} + +static apr_status_t read_heartbeats(const char *path, + apr_array_header_t *servers, + apr_pool_t *pool) +{ + apr_finfo_t fi; + apr_status_t rv; + apr_file_t *fp; + + if (!path) { + return APR_SUCCESS; + } + + rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED, + APR_OS_DEFAULT, pool); + + if (rv) { + return rv; + } + + rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp); + + if (rv) { + return rv; + } + + { + char *t; + int lineno = 0; + apr_table_t *hbt = apr_table_make(pool, 10); + char buf[4096]; + + while (apr_file_gets(buf, sizeof(buf), fp) == APR_SUCCESS) { + hb_server_t *server; + const char *ip; + lineno++; + + /* comment */ + if (buf[0] == '#') { + continue; + } + + + /* line format: \n */ + t = strchr(buf, ' '); + if (!t) { + continue; + } + + ip = apr_pstrndup(pool, buf, t - buf); + t++; + server = apr_pcalloc(pool, sizeof(hb_server_t)); + server->ip = ip; + server->seen = -1; + apr_table_clear(hbt); + + argstr_to_table(pool, apr_pstrdup(pool, t), hbt); + + if (apr_table_get(hbt, "busy")) { + server->busy = atoi(apr_table_get(hbt, "busy")); + } + + if (apr_table_get(hbt, "ready")) { + server->ready = atoi(apr_table_get(hbt, "ready")); + } + + if (apr_table_get(hbt, "lastseen")) { + server->seen = atoi(apr_table_get(hbt, "lastseen")); + } + + if (server->busy == 0 && server->ready != 0) { + /* Server has zero threads active, but lots of them ready, + * it likely just started up, so lets /4 the number ready, + * to prevent us from completely flooding it with all new + * requests. + */ + server->ready = server->ready / 4; + } + + APR_ARRAY_PUSH(servers, hb_server_t *) = server; + } + } + + return APR_SUCCESS; +} + +static int hb_server_sort(const void *a_, const void *b_) +{ + hb_server_t *a = (hb_server_t*)a; + hb_server_t *b = (hb_server_t*)b; + if (a->ready == b->ready) { + return 0; + } + else if (a->ready > b->ready) { + return -1; + } + else { + return 1; + } +} + +static int hb_list_servers(void *baton, + request_rec *r, + apr_table_t *params, + apr_array_header_t **out_servers) +{ + int i; + hb_server_t *hbs; + apr_status_t rv; + apr_pool_t *tpool; + apr_array_header_t *tmpservers; + apr_array_header_t *servers; + const char *path = apr_table_get(params, "path"); + + apr_pool_create(&tpool, r->pool); + + tmpservers = apr_array_make(tpool, 32, sizeof(hb_server_t *)); + rv = read_heartbeats(path, tmpservers, tpool); + + if (rv) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, + "SerfCluster: Heartbeat unable to read '%s'", path); + apr_pool_destroy(tpool); + return HTTP_INTERNAL_SERVER_ERROR; + } + + qsort(tmpservers->elts, tmpservers->nelts, sizeof(hb_server_t *), + hb_server_sort); + + servers = apr_array_make(r->pool, tmpservers->nelts, sizeof(ap_serf_server_t *)); + for (i = 0; + i < tmpservers->nelts; + i++) + { + ap_serf_server_t *x; + + hbs = APR_ARRAY_IDX(tmpservers, i, hb_server_t *); + if (hbs->ready > 0) { + x = apr_palloc(r->pool, sizeof(ap_serf_server_t)); + x->ip = apr_pstrdup(r->pool, hbs->ip); + /* TODO: expand multicast format to support ports? */ + x->port = 80; + APR_ARRAY_PUSH(servers, ap_serf_server_t *) = x; + } + } + + *out_servers = servers; + apr_pool_destroy(tpool); + return OK; +} + +static const ap_serf_cluster_provider_t builtin_heartbeat = +{ + "heartbeat", + NULL, + &hb_config_check, + &hb_list_servers, + NULL, + NULL +}; + static void register_hooks(apr_pool_t *p) { + ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER, + "heartbeat", "0", &builtin_heartbeat); + ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST); } module AP_MODULE_DECLARE_DATA serf_module = { STANDARD20_MODULE_STUFF, - create_config, - NULL, - NULL, + create_dir_config, NULL, + create_server_config, + merge_server_config, serf_cmds, register_hooks }; diff --git a/modules/proxy/mod_serf.h b/modules/proxy/mod_serf.h new file mode 100644 index 0000000000..80f40c6b5e --- /dev/null +++ b/modules/proxy/mod_serf.h @@ -0,0 +1,102 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * @file mod_serf.h + * @brief Serf Interfaces + * + */ + +#include "httpd.h" +#include "http_config.h" +#include "ap_provider.h" + +#ifndef _MOD_SERF_H_ +#define _MOD_SERF_H_ +/** + * @addtogroup Serf_cluster_provider + * @{ + */ +#define AP_SERF_CLUSTER_PROVIDER "serf_cluster" +typedef struct ap_serf_server_t ap_serf_server_t; +struct ap_serf_server_t { + /* TOOD: consider using apr_sockaddr_t, except they suck. */ + const char *ip; + apr_port_t port; +}; + +typedef struct ap_serf_cluster_provider_t ap_serf_cluster_provider_t; +struct ap_serf_cluster_provider_t { + /** + * Human readable name of this provider, used in configuration. + */ + const char *name; + /** + * Baton passed to all methods in this provider. + * + * This field may be NULL. + */ + void *baton; + /** + * Check that the key/value pairs used to configure the + * cluster are valid. + * + * Return non-NULL on failure with an error message, like standard httpd + * configuration directives. + * + * This field must be set. + */ + const char* (*check_config)(void *baton, + cmd_parms *cmd, + apr_table_t *params); + /** + * Provide an ordered array of ap_serf_server_t in the order that + * mod_serf should attempt to use them. If a server on the list + * is known to be not responding, it may be skipped. If mod_serf is + * unable to contact any of the servers, a 502 will be returned to the + * client. + * + * Returns OK on sucess, all other return codes will result in a 500. + * + * This field must be set. + */ + int (*list_servers)(void *baton, + request_rec *r, + apr_table_t *params, + apr_array_header_t **servers); + /** + * If a request was successfully fulfilled by this address, feedback will + * be given to the provider, so it may make better recommendations. + * + * This field may be NULL. + */ + void (*server_success)(void *baton, request_rec *r, apr_table_t *params, + ap_serf_server_t* server); + /** + * If a request failed to be fulfilled by this address, feedback will + * be given to the provider, so it may make better recommendations. + * + * This field may be NULL. + */ + void (*server_failure)(void *baton, request_rec *r, apr_table_t *params, + ap_serf_server_t* server); + +}; +/** @} */ + +#endif /* _MOD_SERF_H_ */ +