]> granicus.if.org Git - apache/blob - modules/proxy/balancers/mod_lbmethod_heartbeat.c
Try to have consistent interface regardless of slotmem or
[apache] / modules / proxy / balancers / mod_lbmethod_heartbeat.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "mod_proxy.h"
18 #include "scoreboard.h"
19 #include "ap_mpm.h"
20 #include "apr_version.h"
21 #include "apr_hooks.h"
22 #include "ap_slotmem.h"
23 #include "heartbeat.h"
24
25 #ifndef LBM_HEARTBEAT_MAX_LASTSEEN
26 /* If we haven't seen a heartbeat in the last N seconds, don't count this IP
27  * as allive.
28  */
29 #define LBM_HEARTBEAT_MAX_LASTSEEN (10)
30 #endif
31
32 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module;
33
34 static const ap_slotmem_provider_t *storage = NULL;
35 static ap_slotmem_instance_t *hm_serversmem = NULL;
36
37 /*
38  * configuration structure
39  * path: path of the file where the heartbeat information is stored.
40  */
41 typedef struct lb_hb_ctx_t
42 {
43     const char *path;
44 } lb_hb_ctx_t;
45
46 typedef struct hb_server_t {
47     const char *ip;
48     int busy;
49     int ready;
50     int seen;
51     int port;
52     int id;
53     proxy_worker *worker;
54 } hb_server_t;
55
56 static void
57 argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
58 {
59     char *key;
60     char *value;
61     char *strtok_state;
62     
63     key = apr_strtok(str, "&", &strtok_state);
64     while (key) {
65         value = strchr(key, '=');
66         if (value) {
67             *value = '\0';      /* Split the string in two */
68             value++;            /* Skip passed the = */
69         }
70         else {
71             value = "1";
72         }
73         ap_unescape_url(key);
74         ap_unescape_url(value);
75         apr_table_set(parms, key, value);
76         /*
77          ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
78          "Found query arg: %s = %s", key, value);
79          */
80         key = apr_strtok(NULL, "&", &strtok_state);
81     }
82 }
83
84 static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers,
85                                     apr_pool_t *pool)
86 {
87     apr_finfo_t fi;
88     apr_status_t rv;
89     apr_file_t *fp;
90
91     if (!path) {
92         return APR_SUCCESS;
93     }
94
95     rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
96                        APR_OS_DEFAULT, pool);
97
98     if (rv) {
99         return rv;
100     }
101
102     rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
103
104     if (rv) {
105         return rv;
106     }
107
108     {
109         char *t;
110         int lineno = 0;
111         apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
112         apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
113         apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
114         apr_table_t *hbt = apr_table_make(pool, 10);
115
116         apr_brigade_insert_file(bb, fp, 0, fi.size, pool);
117
118         do {
119             hb_server_t *server;
120             char buf[4096];
121             apr_size_t bsize = sizeof(buf);
122             const char *ip;
123
124             apr_brigade_cleanup(tmpbb);
125
126             if (APR_BRIGADE_EMPTY(bb)) {
127                 break;
128             }
129
130             rv = apr_brigade_split_line(tmpbb, bb,
131                                         APR_BLOCK_READ, sizeof(buf));
132             lineno++;
133
134             if (rv) {
135                 return rv;
136             }
137
138             apr_brigade_flatten(tmpbb, buf, &bsize);
139
140             if (bsize == 0) {
141                 break;
142             }
143
144             buf[bsize - 1] = 0;
145
146             /* comment */
147             if (buf[0] == '#') {
148                 continue;
149             }
150
151             /* line format: <IP> <query_string>\n */
152             t = strchr(buf, ' ');
153             if (!t) {
154                 continue;
155             }
156             
157             ip = apr_pstrndup(pool, buf, t - buf);
158             t++;
159
160             server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING);
161             
162             if (server == NULL) {
163                 server = apr_pcalloc(pool, sizeof(hb_server_t));
164                 server->ip = ip;
165                 server->port = 80;
166                 server->seen = -1;
167
168                 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
169             }
170             
171             apr_table_clear(hbt);
172
173             argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
174
175             if (apr_table_get(hbt, "busy")) {
176                 server->busy = atoi(apr_table_get(hbt, "busy"));
177             }
178
179             if (apr_table_get(hbt, "ready")) {
180                 server->ready = atoi(apr_table_get(hbt, "ready"));
181             }
182
183             if (apr_table_get(hbt, "lastseen")) {
184                 server->seen = atoi(apr_table_get(hbt, "lastseen"));
185             }
186
187             if (apr_table_get(hbt, "port")) {
188                 server->port = atoi(apr_table_get(hbt, "port"));
189             }
190
191             if (server->busy == 0 && server->ready != 0) {
192                 /* Server has zero threads active, but lots of them ready, 
193                  * it likely just started up, so lets /4 the number ready, 
194                  * to prevent us from completely flooding it with all new 
195                  * requests.
196                  */
197                 server->ready = server->ready / 4;
198             }
199
200         } while (1);
201     }
202
203     return APR_SUCCESS;
204 }
205
206 static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool)
207 {
208     hm_slot_server_t *slotserver = (hm_slot_server_t *) mem;
209     apr_hash_t *servers = (apr_hash_t *) data;
210     hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING);
211     if (server == NULL) {
212         server = apr_pcalloc(pool, sizeof(hb_server_t));
213         server->ip = apr_pstrdup(pool, slotserver->ip);
214         server->seen = -1;
215
216         apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
217
218     }
219     server->busy = slotserver->busy;
220     server->ready = slotserver->ready;
221     server->seen = slotserver->seen;
222     server->id = slotserver->id;
223     if (server->busy == 0 && server->ready != 0) {
224         server->ready = server->ready / 4;
225     }
226     return APR_SUCCESS;
227 }
228 static apr_status_t readslot_heartbeats(apr_hash_t *servers,
229                                     apr_pool_t *pool)
230 {
231     storage->doall(hm_serversmem, hm_read, servers, pool);
232     return APR_SUCCESS;
233 }
234
235
236 static apr_status_t read_heartbeats(const char *path, apr_hash_t *servers,
237                                         apr_pool_t *pool)
238 {
239     apr_status_t rv;
240     if (hm_serversmem)
241         rv = readslot_heartbeats(servers, pool);
242     else
243         rv = readfile_heartbeats(path, servers, pool);
244     return rv;
245 }
246
247 /*
248  * Finding a random number in a range. 
249  *      n' = a + n(b-a+1)/(M+1)
250  * where:
251  *      n' = random number in range
252  *      a  = low end of range
253  *      b  = high end of range
254  *      n  = random number of 0..M
255  *      M  = maxint
256  * Algorithm 'borrowed' from PHP's rand() function.
257  */
258 #define RAND_RANGE(__n, __min, __max, __tmax) \
259 (__n) = (__min) + (long) ((double) ((__max) - (__min) + 1.0) * ((__n) / ((__tmax) + 1.0)))
260
261 static apr_status_t random_pick(apr_uint32_t *number,
262                                 apr_uint32_t min,
263                                 apr_uint32_t max)
264 {
265     apr_status_t rv = 
266         apr_generate_random_bytes((void*)number, sizeof(apr_uint32_t));
267
268     if (rv) {
269         return rv;
270     }
271
272     RAND_RANGE(*number, min, max, APR_UINT32_MAX);
273
274     return APR_SUCCESS;
275 }
276
277 static proxy_worker *find_best_hb(proxy_balancer *balancer,
278                                   request_rec *r)
279 {
280     apr_status_t rv;
281     int i;
282     apr_uint32_t openslots = 0;
283     proxy_worker **worker;
284     hb_server_t *server;
285     apr_array_header_t *up_servers;
286     proxy_worker *mycandidate = NULL;
287     apr_pool_t *tpool;
288     apr_hash_t *servers;
289
290     lb_hb_ctx_t *ctx = 
291         ap_get_module_config(r->server->module_config,
292                              &lbmethod_heartbeat_module);
293
294     apr_pool_create(&tpool, r->pool);
295
296     servers = apr_hash_make(tpool);
297
298     rv = read_heartbeats(ctx->path, servers, tpool);
299
300     if (rv) {
301         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
302                       "lb_heartbeat: Unable to read heartbeats at '%s'",
303                       ctx->path);
304         apr_pool_destroy(tpool);
305         return NULL;
306     }
307
308     up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *));
309
310     for (i = 0; i < balancer->workers->nelts; i++) {
311         worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *);
312         server = apr_hash_get(servers, (*worker)->hostname, APR_HASH_KEY_STRING);
313
314         if (!server) {
315             continue;
316         }
317
318         if (!PROXY_WORKER_IS_USABLE(*worker)) {
319             ap_proxy_retry_worker("BALANCER", *worker, r->server);
320         }
321
322         if (PROXY_WORKER_IS_USABLE(*worker)) {
323             server->worker = *worker;
324             if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) {
325                 openslots += server->ready;
326                 APR_ARRAY_PUSH(up_servers, hb_server_t *) = server;
327             }
328         }
329     }
330
331     if (openslots > 0) {
332         apr_uint32_t c = 0;
333         apr_uint32_t pick = 0;
334
335         rv = random_pick(&pick, 0, openslots);
336
337         if (rv) {
338             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
339                           "lb_heartbeat: failed picking a random number. how random.");
340             apr_pool_destroy(tpool);
341             return NULL;
342         }
343
344         for (i = 0; i < up_servers->nelts; i++) {
345             server = APR_ARRAY_IDX(up_servers, i, hb_server_t *);
346             if (pick > c && pick <= c + server->ready) {
347                 mycandidate = server->worker;
348             }
349
350             c += server->ready;
351         }
352     }
353
354     apr_pool_destroy(tpool);
355
356     return mycandidate;
357 }
358
359 static apr_status_t reset(proxy_balancer *balancer, server_rec *s) {
360         return APR_SUCCESS;
361 }
362
363 static apr_status_t age(proxy_balancer *balancer, server_rec *s) {
364         return APR_SUCCESS;
365 }
366
367 static const proxy_balancer_method heartbeat =
368 {
369     "heartbeat",
370     &find_best_hb,
371     NULL,
372     &reset,
373     &age
374 };
375
376 static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog,
377                           apr_pool_t *ptemp, server_rec *s)
378 {
379     const char *userdata_key = "mod_lbmethod_heartbeat_init";
380     void *data;
381     apr_size_t size;
382     unsigned int num;
383     lb_hb_ctx_t *ctx = ap_get_module_config(s->module_config,
384                                             &lbmethod_heartbeat_module);
385     
386     apr_pool_userdata_get(&data, userdata_key, s->process->pool);
387     if (!data) {
388         /* first call do nothing */
389         apr_pool_userdata_set((const void *)1, userdata_key, apr_pool_cleanup_null, s->process->pool);
390         return OK;
391     }
392     storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shared", "0");
393     if (!storage) {
394         ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "ap_lookup_provider %s failed", AP_SLOTMEM_PROVIDER_GROUP);
395         return OK;
396     }
397
398     /* Try to use a slotmem created by mod_heartmonitor */
399     storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p);
400     if (!hm_serversmem) {
401         ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "No slotmem from mod_heartmonitor");
402     } else
403         ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "Using slotmem from mod_heartmonitor");
404
405     if (hm_serversmem)
406         ctx->path = "(slotmem)";
407
408     return OK;
409 }
410
411 static void register_hooks(apr_pool_t *p)
412 {
413     static const char * const aszPred[]={ "mod_heartmonitor.c", NULL };
414     ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat);
415     ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE);
416 }
417
418 static void *lb_hb_create_config(apr_pool_t *p, server_rec *s)
419 {
420     lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t));
421     
422     ctx->path = ap_server_root_relative(p, "logs/hb.dat");
423     
424     return ctx;
425 }
426
427 static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv)
428 {
429     lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t));
430     lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev;
431     lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv;
432
433     if (overrides->path) {
434         ps->path = apr_pstrdup(p, overrides->path);
435     }
436     else {
437         ps->path = apr_pstrdup(p, base->path);
438     }
439
440     return ps;
441 }
442
443 static const char *cmd_lb_hb_storage(cmd_parms *cmd,
444                                   void *dconf, const char *path)
445 {
446     apr_pool_t *p = cmd->pool;
447     lb_hb_ctx_t *ctx =
448     (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
449                                          &lbmethod_heartbeat_module);
450
451     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
452     
453     if (err != NULL) {
454         return err;
455     }
456
457     ctx->path = ap_server_root_relative(p, path);
458
459     return NULL;
460 }
461
462 static const command_rec cmds[] = {
463     AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF,
464                   "Path to read heartbeat data."),
465     {NULL}
466 };
467
468 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module = {
469     STANDARD20_MODULE_STUFF,
470     NULL,                       /* create per-directory config structure */
471     NULL,                       /* merge per-directory config structures */
472     lb_hb_create_config,        /* create per-server config structure */
473     lb_hb_merge_config,         /* merge per-server config structures */
474     cmds,                       /* command apr_table_t */
475     register_hooks              /* register hooks */
476 };