1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "mod_proxy.h"
18 #include "scoreboard.h"
20 #include "apr_version.h"
21 #include "apr_hooks.h"
22 #include "ap_slotmem.h"
23 #include "heartbeat.h"
25 #ifndef LBM_HEARTBEAT_MAX_LASTSEEN
26 /* If we haven't seen a heartbeat in the last N seconds, don't count this IP
29 #define LBM_HEARTBEAT_MAX_LASTSEEN (10)
32 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module;
34 static const ap_slotmem_provider_t *storage = NULL;
35 static ap_slotmem_instance_t *hm_serversmem = NULL;
38 * configuration structure
39 * path: path of the file where the heartbeat information is stored.
41 typedef struct lb_hb_ctx_t
46 typedef struct hb_server_t {
57 argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
63 key = apr_strtok(str, "&", &strtok_state);
65 value = strchr(key, '=');
67 *value = '\0'; /* Split the string in two */
68 value++; /* Skip passed the = */
74 ap_unescape_url(value);
75 apr_table_set(parms, key, value);
77 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
78 "Found query arg: %s = %s", key, value);
80 key = apr_strtok(NULL, "&", &strtok_state);
84 static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers,
95 rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
96 APR_OS_DEFAULT, pool);
102 rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
111 apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
112 apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
113 apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
114 apr_table_t *hbt = apr_table_make(pool, 10);
116 apr_brigade_insert_file(bb, fp, 0, fi.size, pool);
121 apr_size_t bsize = sizeof(buf);
124 apr_brigade_cleanup(tmpbb);
126 if (APR_BRIGADE_EMPTY(bb)) {
130 rv = apr_brigade_split_line(tmpbb, bb,
131 APR_BLOCK_READ, sizeof(buf));
138 apr_brigade_flatten(tmpbb, buf, &bsize);
151 /* line format: <IP> <query_string>\n */
152 t = strchr(buf, ' ');
157 ip = apr_pstrndup(pool, buf, t - buf);
160 server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING);
162 if (server == NULL) {
163 server = apr_pcalloc(pool, sizeof(hb_server_t));
168 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
171 apr_table_clear(hbt);
173 argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
175 if (apr_table_get(hbt, "busy")) {
176 server->busy = atoi(apr_table_get(hbt, "busy"));
179 if (apr_table_get(hbt, "ready")) {
180 server->ready = atoi(apr_table_get(hbt, "ready"));
183 if (apr_table_get(hbt, "lastseen")) {
184 server->seen = atoi(apr_table_get(hbt, "lastseen"));
187 if (apr_table_get(hbt, "port")) {
188 server->port = atoi(apr_table_get(hbt, "port"));
191 if (server->busy == 0 && server->ready != 0) {
192 /* Server has zero threads active, but lots of them ready,
193 * it likely just started up, so lets /4 the number ready,
194 * to prevent us from completely flooding it with all new
197 server->ready = server->ready / 4;
206 static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool)
208 hm_slot_server_t *slotserver = (hm_slot_server_t *) mem;
209 apr_hash_t *servers = (apr_hash_t *) data;
210 hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING);
211 if (server == NULL) {
212 server = apr_pcalloc(pool, sizeof(hb_server_t));
213 server->ip = apr_pstrdup(pool, slotserver->ip);
216 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
219 server->busy = slotserver->busy;
220 server->ready = slotserver->ready;
221 server->seen = slotserver->seen;
222 server->id = slotserver->id;
223 if (server->busy == 0 && server->ready != 0) {
224 server->ready = server->ready / 4;
228 static apr_status_t readslot_heartbeats(apr_hash_t *servers,
231 storage->doall(hm_serversmem, hm_read, servers, pool);
236 static apr_status_t read_heartbeats(const char *path, apr_hash_t *servers,
241 rv = readslot_heartbeats(servers, pool);
243 rv = readfile_heartbeats(path, servers, pool);
248 * Finding a random number in a range.
249 * n' = a + n(b-a+1)/(M+1)
251 * n' = random number in range
252 * a = low end of range
253 * b = high end of range
254 * n = random number of 0..M
256 * Algorithm 'borrowed' from PHP's rand() function.
258 #define RAND_RANGE(__n, __min, __max, __tmax) \
259 (__n) = (__min) + (long) ((double) ((__max) - (__min) + 1.0) * ((__n) / ((__tmax) + 1.0)))
261 static apr_status_t random_pick(apr_uint32_t *number,
266 apr_generate_random_bytes((void*)number, sizeof(apr_uint32_t));
272 RAND_RANGE(*number, min, max, APR_UINT32_MAX);
277 static proxy_worker *find_best_hb(proxy_balancer *balancer,
282 apr_uint32_t openslots = 0;
283 proxy_worker **worker;
285 apr_array_header_t *up_servers;
286 proxy_worker *mycandidate = NULL;
291 ap_get_module_config(r->server->module_config,
292 &lbmethod_heartbeat_module);
294 apr_pool_create(&tpool, r->pool);
296 servers = apr_hash_make(tpool);
298 rv = read_heartbeats(ctx->path, servers, tpool);
301 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
302 "lb_heartbeat: Unable to read heartbeats at '%s'",
304 apr_pool_destroy(tpool);
308 up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *));
310 for (i = 0; i < balancer->workers->nelts; i++) {
311 worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *);
312 server = apr_hash_get(servers, (*worker)->hostname, APR_HASH_KEY_STRING);
318 if (!PROXY_WORKER_IS_USABLE(*worker)) {
319 ap_proxy_retry_worker("BALANCER", *worker, r->server);
322 if (PROXY_WORKER_IS_USABLE(*worker)) {
323 server->worker = *worker;
324 if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) {
325 openslots += server->ready;
326 APR_ARRAY_PUSH(up_servers, hb_server_t *) = server;
333 apr_uint32_t pick = 0;
335 rv = random_pick(&pick, 0, openslots);
338 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
339 "lb_heartbeat: failed picking a random number. how random.");
340 apr_pool_destroy(tpool);
344 for (i = 0; i < up_servers->nelts; i++) {
345 server = APR_ARRAY_IDX(up_servers, i, hb_server_t *);
346 if (pick > c && pick <= c + server->ready) {
347 mycandidate = server->worker;
354 apr_pool_destroy(tpool);
359 static apr_status_t reset(proxy_balancer *balancer, server_rec *s) {
363 static apr_status_t age(proxy_balancer *balancer, server_rec *s) {
367 static const proxy_balancer_method heartbeat =
376 static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog,
377 apr_pool_t *ptemp, server_rec *s)
379 const char *userdata_key = "mod_lbmethod_heartbeat_init";
383 lb_hb_ctx_t *ctx = ap_get_module_config(s->module_config,
384 &lbmethod_heartbeat_module);
386 apr_pool_userdata_get(&data, userdata_key, s->process->pool);
388 /* first call do nothing */
389 apr_pool_userdata_set((const void *)1, userdata_key, apr_pool_cleanup_null, s->process->pool);
392 storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shared", "0");
394 ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "ap_lookup_provider %s failed", AP_SLOTMEM_PROVIDER_GROUP);
398 /* Try to use a slotmem created by mod_heartmonitor */
399 storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p);
400 if (!hm_serversmem) {
401 ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "No slotmem from mod_heartmonitor");
403 ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "Using slotmem from mod_heartmonitor");
406 ctx->path = "(slotmem)";
411 static void register_hooks(apr_pool_t *p)
413 static const char * const aszPred[]={ "mod_heartmonitor.c", NULL };
414 ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat);
415 ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE);
418 static void *lb_hb_create_config(apr_pool_t *p, server_rec *s)
420 lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t));
422 ctx->path = ap_server_root_relative(p, "logs/hb.dat");
427 static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv)
429 lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t));
430 lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev;
431 lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv;
433 if (overrides->path) {
434 ps->path = apr_pstrdup(p, overrides->path);
437 ps->path = apr_pstrdup(p, base->path);
443 static const char *cmd_lb_hb_storage(cmd_parms *cmd,
444 void *dconf, const char *path)
446 apr_pool_t *p = cmd->pool;
448 (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
449 &lbmethod_heartbeat_module);
451 const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
457 ctx->path = ap_server_root_relative(p, path);
462 static const command_rec cmds[] = {
463 AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF,
464 "Path to read heartbeat data."),
468 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module = {
469 STANDARD20_MODULE_STUFF,
470 NULL, /* create per-directory config structure */
471 NULL, /* merge per-directory config structures */
472 lb_hb_create_config, /* create per-server config structure */
473 lb_hb_merge_config, /* merge per-server config structures */
474 cmds, /* command apr_table_t */
475 register_hooks /* register hooks */