* limitations under the License.
*/
+#include "mod_serf.h"
+
#include "httpd.h"
#include "http_core.h"
#include "http_config.h"
#include "serf.h"
#include "apr_uri.h"
+#include "apr_strings.h"
+
module AP_MODULE_DECLARE_DATA serf_module;
typedef struct {
int on;
apr_uri_t url;
-} serf_config_rec;
+} serf_config_t;
+
+typedef struct {
+ const char *name;
+ const char *provider;
+ apr_table_t *params;
+} serf_cluster_t;
+
+typedef struct {
+ /* name -> serf_cluster_t* */
+ apr_hash_t *clusters;
+} serf_server_config_t;
typedef struct {
int rstatus;
int done_headers;
int keep_reading;
request_rec *r;
- serf_config_rec *conf;
+ serf_config_t *conf;
serf_ssl_context_t *ssl_ctx;
serf_bucket_alloc_t *bkt_alloc;
} s_baton_t;
return c;
}
-int copy_headers_in(void *vbaton, const char *key, const char *value)
+static int copy_headers_in(void *vbaton, const char *key, const char *value)
{
serf_bucket_t *hdrs_bkt = (serf_bucket_t *)vbaton;
return 0;
}
-int copy_headers_out(void *vbaton, const char *key, const char *value)
+static int copy_headers_out(void *vbaton, const char *key, const char *value)
{
s_baton_t *ctx = vbaton;
int done = 0;
return APR_SUCCESS;
}
-static int drive_serf(request_rec *r, serf_config_rec *conf)
+/* TOOD: rewrite drive_serf to make it async */
+static int drive_serf(request_rec *r, serf_config_t *conf)
{
apr_status_t rv;
apr_pool_t *pool = r->pool;
serf_context_t *serfme;
serf_connection_t *conn;
serf_request_t *srequest;
+ serf_server_config_t *ctx =
+ (serf_server_config_t *)ap_get_module_config(r->server->module_config,
+ &serf_module);
+
+ if (strcmp(conf->url.scheme, "cluster") == 0) {
+ int rc;
+ ap_serf_cluster_provider_t *cp;
+ serf_cluster_t *cluster;
+ apr_array_header_t *servers = NULL;
+ ap_serf_server_t *choice;
+
+ /* TODO: could this be optimized in post-config to pre-setup the
+ * pointers to the right cluster inside the conf structure?
+ */
+ cluster = apr_hash_get(ctx->clusters,
+ conf->url.hostname,
+ APR_HASH_KEY_STRING);
+ if (!cluster) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+ "SerfCluster: unable to find cluster %s", conf->url.hostname);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ cp = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
+
+ if (cp == NULL) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+ "SerfCluster: unable to find provider %s", cluster->provider);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ if (cp->list_servers == NULL) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+ "SerfCluster: %s is missing list servers provider.", cluster->provider);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ rc = cp->list_servers(cp->baton,
+ r,
+ cluster->params,
+ &servers);
+
+ if (rc != OK) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r,
+ "SerfCluster: %s list servers returned failure", cluster->provider);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ if (servers == NULL || apr_is_empty_array(servers)) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r,
+ "SerfCluster: %s failed to provide a list of servers", cluster->provider);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
- /* XXXXX: cache dns? */
- rv = apr_sockaddr_info_get(&address, conf->url.hostname,
- APR_UNSPEC, conf->url.port, 0,
- pool);
+ /* TOOD: restructure try all servers in the array !! */
+ choice = APR_ARRAY_IDX(servers, 0, ap_serf_server_t *);
+
+ rv = apr_sockaddr_info_get(&address, choice->ip,
+ APR_UNSPEC, choice->port, 0,
+ pool);
+ }
+ else {
+ /* XXXXX: cache dns? */
+ rv = apr_sockaddr_info_get(&address, conf->url.hostname,
+ APR_UNSPEC, conf->url.port, 0,
+ pool);
+ }
if (rv != APR_SUCCESS) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "Unable to resolve: %s", conf->url.hostname);
static int serf_handler(request_rec *r)
{
- serf_config_rec *conf = ap_get_module_config(r->per_dir_config,
- &serf_module);
+ serf_config_t *conf = ap_get_module_config(r->per_dir_config,
+ &serf_module);
if (conf->on == 0) {
return DECLINED;
const char *vdest)
{
apr_status_t rv;
- serf_config_rec *conf = (serf_config_rec *) vconf;
+ serf_config_t *conf = (serf_config_t *) vconf;
rv = apr_uri_parse(cmd->pool, vdest, &conf->url);
return NULL;
}
-static void *create_config(apr_pool_t *p, char *dummy)
+/* SerfCluster <name> <provider> <key=value_params_to_provider> ... */
+
+static const char *add_cluster(cmd_parms *cmd, void *d,
+ int argc, char *const argv[])
{
- serf_config_rec *new = (serf_config_rec *) apr_pcalloc(p, sizeof(serf_config_rec));
+ const char *rv;
+ ap_serf_cluster_provider_t *backend;
+ int i;
+ serf_cluster_t *cluster = NULL;
+ serf_server_config_t *ctx =
+ (serf_server_config_t *)ap_get_module_config(cmd->server->module_config,
+ &serf_module);
+
+ const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+ if (err != NULL) {
+ return err;
+ }
+
+ if (argc < 2) {
+ return "SerfCluster must have at least a name and provider.";
+ }
+
+ cluster = apr_palloc(cmd->pool, sizeof(serf_cluster_t));
+ cluster->name = apr_pstrdup(cmd->pool, argv[0]);
+ cluster->provider = apr_pstrdup(cmd->pool, argv[1]);
+ cluster->params = apr_table_make(cmd->pool, 6);
+
+ backend = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
+
+ if (backend == NULL) {
+ return apr_psprintf(cmd->pool, "SerfCluster: unable to find "
+ "provider '%s'", cluster->provider);
+ }
+
+ for (i = 2; i < argc; i++) {
+ const char *p = argv[i];
+ const char *x = ap_strchr(p, '=');
+
+ if (x && strlen(p) > 1) {
+ apr_table_addn(cluster->params,
+ apr_pstrndup(cmd->pool, p, x-p),
+ x+1);
+ }
+ else {
+ apr_table_addn(cluster->params,
+ apr_pstrdup(cmd->pool, p),
+ "");
+ }
+ }
+
+ if (backend->check_config == NULL) {
+ return apr_psprintf(cmd->pool, "SerfCluster: Provider '%s' failed to "
+ "provider a configuration checker",
+ cluster->provider);
+ }
+
+ rv = backend->check_config(backend->baton, cmd, cluster->params);
+
+ if (rv) {
+ return rv;
+ }
+
+ apr_hash_set(ctx->clusters, cluster->name, APR_HASH_KEY_STRING, cluster);
+
+ return NULL;
+}
+
+static void *create_dir_config(apr_pool_t *p, char *dummy)
+{
+ serf_config_t *new = (serf_config_t *) apr_pcalloc(p, sizeof(serf_config_t));
new->on = 0;
return new;
}
+static void *create_server_config(apr_pool_t *p, server_rec *s)
+{
+ serf_server_config_t *ctx =
+ (serf_server_config_t *) apr_pcalloc(p, sizeof(serf_server_config_t));
+
+ ctx->clusters = apr_hash_make(p);
+
+ return ctx;
+}
+
+static void * merge_server_config(apr_pool_t *p, void *basev, void *overridesv)
+{
+ serf_server_config_t *ctx = apr_pcalloc(p, sizeof(serf_server_config_t));
+ serf_server_config_t *base = (serf_server_config_t *) basev;
+ serf_server_config_t *overrides = (serf_server_config_t *) overridesv;
+
+ ctx->clusters = apr_hash_overlay(p, base->clusters, overrides->clusters);
+ return ctx;
+}
+
static const command_rec serf_cmds[] =
{
- AP_INIT_TAKE1("SerfPass", add_pass, NULL, OR_INDEXES/*making shit up*/,
- "A prefix and destination"),
+ AP_INIT_TAKE_ARGV("SerfCluster", add_cluster, NULL, RSRC_CONF,
+ "Configure a cluster backend"),
+ AP_INIT_TAKE1("SerfPass", add_pass, NULL, OR_INDEXES,
+ "URL to reverse proxy to"),
{NULL}
};
+typedef struct hb_table_baton_t {
+ apr_pool_t *p;
+ const char *msg;
+} hb_table_baton_t;
+
+static int hb_table_check(void *rec, const char *key, const char *value)
+{
+ hb_table_baton_t *b = (hb_table_baton_t*)rec;
+ if (strcmp(key, "path") != 0) {
+ b->msg = apr_psprintf(b->p,
+ "SerfCluster Heartbeat Invalid parameter '%s'",
+ key);
+ return 1;
+ }
+
+ return 0;
+}
+
+static const char* hb_config_check(void *baton,
+ cmd_parms *cmd,
+ apr_table_t *params)
+{
+ hb_table_baton_t b;
+
+ if (apr_is_empty_table(params)) {
+ return "SerfCluster Heartbeat requires a path to the heartbat information.";
+ }
+
+ b.p = cmd->pool;
+ b.msg = NULL;
+
+ apr_table_do(hb_table_check, &b, params, NULL);
+
+ if (b.msg) {
+ return b.msg;
+ }
+
+ return NULL;
+}
+
+typedef struct hb_server_t {
+ const char *ip;
+ int busy;
+ int ready;
+ int seen;
+} hb_server_t;
+
+static void
+argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
+{
+ char *key;
+ char *value;
+ char *strtok_state;
+
+ key = apr_strtok(str, "&", &strtok_state);
+ while (key) {
+ value = strchr(key, '=');
+ if (value) {
+ *value = '\0'; /* Split the string in two */
+ value++; /* Skip passed the = */
+ }
+ else {
+ value = "1";
+ }
+ ap_unescape_url(key);
+ ap_unescape_url(value);
+ apr_table_set(parms, key, value);
+ key = apr_strtok(NULL, "&", &strtok_state);
+ }
+}
+
+static apr_status_t read_heartbeats(const char *path,
+ apr_array_header_t *servers,
+ apr_pool_t *pool)
+{
+ apr_finfo_t fi;
+ apr_status_t rv;
+ apr_file_t *fp;
+
+ if (!path) {
+ return APR_SUCCESS;
+ }
+
+ rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
+ APR_OS_DEFAULT, pool);
+
+ if (rv) {
+ return rv;
+ }
+
+ rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
+
+ if (rv) {
+ return rv;
+ }
+
+ {
+ char *t;
+ int lineno = 0;
+ apr_table_t *hbt = apr_table_make(pool, 10);
+ char buf[4096];
+
+ while (apr_file_gets(buf, sizeof(buf), fp) == APR_SUCCESS) {
+ hb_server_t *server;
+ const char *ip;
+ lineno++;
+
+ /* comment */
+ if (buf[0] == '#') {
+ continue;
+ }
+
+
+ /* line format: <IP> <query_string>\n */
+ t = strchr(buf, ' ');
+ if (!t) {
+ continue;
+ }
+
+ ip = apr_pstrndup(pool, buf, t - buf);
+ t++;
+ server = apr_pcalloc(pool, sizeof(hb_server_t));
+ server->ip = ip;
+ server->seen = -1;
+ apr_table_clear(hbt);
+
+ argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
+
+ if (apr_table_get(hbt, "busy")) {
+ server->busy = atoi(apr_table_get(hbt, "busy"));
+ }
+
+ if (apr_table_get(hbt, "ready")) {
+ server->ready = atoi(apr_table_get(hbt, "ready"));
+ }
+
+ if (apr_table_get(hbt, "lastseen")) {
+ server->seen = atoi(apr_table_get(hbt, "lastseen"));
+ }
+
+ if (server->busy == 0 && server->ready != 0) {
+ /* Server has zero threads active, but lots of them ready,
+ * it likely just started up, so lets /4 the number ready,
+ * to prevent us from completely flooding it with all new
+ * requests.
+ */
+ server->ready = server->ready / 4;
+ }
+
+ APR_ARRAY_PUSH(servers, hb_server_t *) = server;
+ }
+ }
+
+ return APR_SUCCESS;
+}
+
+static int hb_server_sort(const void *a_, const void *b_)
+{
+ hb_server_t *a = (hb_server_t*)a;
+ hb_server_t *b = (hb_server_t*)b;
+ if (a->ready == b->ready) {
+ return 0;
+ }
+ else if (a->ready > b->ready) {
+ return -1;
+ }
+ else {
+ return 1;
+ }
+}
+
+static int hb_list_servers(void *baton,
+ request_rec *r,
+ apr_table_t *params,
+ apr_array_header_t **out_servers)
+{
+ int i;
+ hb_server_t *hbs;
+ apr_status_t rv;
+ apr_pool_t *tpool;
+ apr_array_header_t *tmpservers;
+ apr_array_header_t *servers;
+ const char *path = apr_table_get(params, "path");
+
+ apr_pool_create(&tpool, r->pool);
+
+ tmpservers = apr_array_make(tpool, 32, sizeof(hb_server_t *));
+ rv = read_heartbeats(path, tmpservers, tpool);
+
+ if (rv) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+ "SerfCluster: Heartbeat unable to read '%s'", path);
+ apr_pool_destroy(tpool);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ qsort(tmpservers->elts, tmpservers->nelts, sizeof(hb_server_t *),
+ hb_server_sort);
+
+ servers = apr_array_make(r->pool, tmpservers->nelts, sizeof(ap_serf_server_t *));
+ for (i = 0;
+ i < tmpservers->nelts;
+ i++)
+ {
+ ap_serf_server_t *x;
+
+ hbs = APR_ARRAY_IDX(tmpservers, i, hb_server_t *);
+ if (hbs->ready > 0) {
+ x = apr_palloc(r->pool, sizeof(ap_serf_server_t));
+ x->ip = apr_pstrdup(r->pool, hbs->ip);
+ /* TODO: expand multicast format to support ports? */
+ x->port = 80;
+ APR_ARRAY_PUSH(servers, ap_serf_server_t *) = x;
+ }
+ }
+
+ *out_servers = servers;
+ apr_pool_destroy(tpool);
+ return OK;
+}
+
+static const ap_serf_cluster_provider_t builtin_heartbeat =
+{
+ "heartbeat",
+ NULL,
+ &hb_config_check,
+ &hb_list_servers,
+ NULL,
+ NULL
+};
+
static void register_hooks(apr_pool_t *p)
{
+ ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER,
+ "heartbeat", "0", &builtin_heartbeat);
+
ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST);
}
module AP_MODULE_DECLARE_DATA serf_module =
{
STANDARD20_MODULE_STUFF,
- create_config,
- NULL,
- NULL,
+ create_dir_config,
NULL,
+ create_server_config,
+ merge_server_config,
serf_cmds,
register_hooks
};
--- /dev/null
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * @file mod_serf.h
+ * @brief Serf Interfaces
+ *
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "ap_provider.h"
+
+#ifndef _MOD_SERF_H_
+#define _MOD_SERF_H_
+/**
+ * @addtogroup Serf_cluster_provider
+ * @{
+ */
+#define AP_SERF_CLUSTER_PROVIDER "serf_cluster"
+typedef struct ap_serf_server_t ap_serf_server_t;
+struct ap_serf_server_t {
+ /* TOOD: consider using apr_sockaddr_t, except they suck. */
+ const char *ip;
+ apr_port_t port;
+};
+
+typedef struct ap_serf_cluster_provider_t ap_serf_cluster_provider_t;
+struct ap_serf_cluster_provider_t {
+ /**
+ * Human readable name of this provider, used in configuration.
+ */
+ const char *name;
+ /**
+ * Baton passed to all methods in this provider.
+ *
+ * This field may be NULL.
+ */
+ void *baton;
+ /**
+ * Check that the key/value pairs used to configure the
+ * cluster are valid.
+ *
+ * Return non-NULL on failure with an error message, like standard httpd
+ * configuration directives.
+ *
+ * This field must be set.
+ */
+ const char* (*check_config)(void *baton,
+ cmd_parms *cmd,
+ apr_table_t *params);
+ /**
+ * Provide an ordered array of ap_serf_server_t in the order that
+ * mod_serf should attempt to use them. If a server on the list
+ * is known to be not responding, it may be skipped. If mod_serf is
+ * unable to contact any of the servers, a 502 will be returned to the
+ * client.
+ *
+ * Returns OK on sucess, all other return codes will result in a 500.
+ *
+ * This field must be set.
+ */
+ int (*list_servers)(void *baton,
+ request_rec *r,
+ apr_table_t *params,
+ apr_array_header_t **servers);
+ /**
+ * If a request was successfully fulfilled by this address, feedback will
+ * be given to the provider, so it may make better recommendations.
+ *
+ * This field may be NULL.
+ */
+ void (*server_success)(void *baton, request_rec *r, apr_table_t *params,
+ ap_serf_server_t* server);
+ /**
+ * If a request failed to be fulfilled by this address, feedback will
+ * be given to the provider, so it may make better recommendations.
+ *
+ * This field may be NULL.
+ */
+ void (*server_failure)(void *baton, request_rec *r, apr_table_t *params,
+ ap_serf_server_t* server);
+
+};
+/** @} */
+
+#endif /* _MOD_SERF_H_ */
+