]> granicus.if.org Git - apache/blob - modules/proxy/balancers/mod_lbmethod_heartbeat.c
mark some private module data as "static", resolving
[apache] / modules / proxy / balancers / mod_lbmethod_heartbeat.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "mod_proxy.h"
18 #include "scoreboard.h"
19 #include "ap_mpm.h"
20 #include "apr_version.h"
21 #include "apr_hooks.h"
22 #include "ap_slotmem.h"
23 #include "heartbeat.h"
24
25 #ifndef LBM_HEARTBEAT_MAX_LASTSEEN
26 /* If we haven't seen a heartbeat in the last N seconds, don't count this IP
27  * as allive.
28  */
29 #define LBM_HEARTBEAT_MAX_LASTSEEN (10)
30 #endif
31
32 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module;
33
34 static const ap_slotmem_provider_t *storage = NULL;
35 static ap_slotmem_instance_t *hm_serversmem = NULL;
36
37 /*
38  * configuration structure
39  * path: path of the file where the heartbeat information is stored.
40  */
41 typedef struct lb_hb_ctx_t
42 {
43     const char *path;
44 } lb_hb_ctx_t;
45
46 typedef struct hb_server_t {
47     const char *ip;
48     int busy;
49     int ready;
50     int seen;
51     int id;
52     proxy_worker *worker;
53 } hb_server_t;
54
55 static void
56 argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
57 {
58     char *key;
59     char *value;
60     char *strtok_state;
61     
62     key = apr_strtok(str, "&", &strtok_state);
63     while (key) {
64         value = strchr(key, '=');
65         if (value) {
66             *value = '\0';      /* Split the string in two */
67             value++;            /* Skip passed the = */
68         }
69         else {
70             value = "1";
71         }
72         ap_unescape_url(key);
73         ap_unescape_url(value);
74         apr_table_set(parms, key, value);
75         /*
76          ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
77          "Found query arg: %s = %s", key, value);
78          */
79         key = apr_strtok(NULL, "&", &strtok_state);
80     }
81 }
82
83 static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers,
84                                     apr_pool_t *pool)
85 {
86     apr_finfo_t fi;
87     apr_status_t rv;
88     apr_file_t *fp;
89
90     if (!path) {
91         return APR_SUCCESS;
92     }
93
94     rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
95                        APR_OS_DEFAULT, pool);
96
97     if (rv) {
98         return rv;
99     }
100
101     rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
102
103     if (rv) {
104         return rv;
105     }
106
107     {
108         char *t;
109         int lineno = 0;
110         apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
111         apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
112         apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
113         apr_table_t *hbt = apr_table_make(pool, 10);
114
115         apr_brigade_insert_file(bb, fp, 0, fi.size, pool);
116
117         do {
118             hb_server_t *server;
119             char buf[4096];
120             apr_size_t bsize = sizeof(buf);
121             const char *ip;
122
123             apr_brigade_cleanup(tmpbb);
124
125             if (APR_BRIGADE_EMPTY(bb)) {
126                 break;
127             }
128
129             rv = apr_brigade_split_line(tmpbb, bb,
130                                         APR_BLOCK_READ, sizeof(buf));
131             lineno++;
132
133             if (rv) {
134                 return rv;
135             }
136
137             apr_brigade_flatten(tmpbb, buf, &bsize);
138
139             if (bsize == 0) {
140                 break;
141             }
142
143             buf[bsize - 1] = 0;
144
145             /* comment */
146             if (buf[0] == '#') {
147                 continue;
148             }
149
150             /* line format: <IP> <query_string>\n */
151             t = strchr(buf, ' ');
152             if (!t) {
153                 continue;
154             }
155             
156             ip = apr_pstrndup(pool, buf, t - buf);
157             t++;
158
159             server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING);
160             
161             if (server == NULL) {
162                 server = apr_pcalloc(pool, sizeof(hb_server_t));
163                 server->ip = ip;
164                 server->seen = -1;
165
166                 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
167             }
168             
169             apr_table_clear(hbt);
170
171             argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
172
173             if (apr_table_get(hbt, "busy")) {
174                 server->busy = atoi(apr_table_get(hbt, "busy"));
175             }
176
177             if (apr_table_get(hbt, "ready")) {
178                 server->ready = atoi(apr_table_get(hbt, "ready"));
179             }
180
181             if (apr_table_get(hbt, "lastseen")) {
182                 server->seen = atoi(apr_table_get(hbt, "lastseen"));
183             }
184
185             if (server->busy == 0 && server->ready != 0) {
186                 /* Server has zero threads active, but lots of them ready, 
187                  * it likely just started up, so lets /4 the number ready, 
188                  * to prevent us from completely flooding it with all new 
189                  * requests.
190                  */
191                 server->ready = server->ready / 4;
192             }
193
194         } while (1);
195     }
196
197     return APR_SUCCESS;
198 }
199
200 static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool)
201 {
202     hm_slot_server_t *slotserver = (hm_slot_server_t *) mem;
203     apr_hash_t *servers = (apr_hash_t *) data;
204     hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING);
205     if (server == NULL) {
206         server = apr_pcalloc(pool, sizeof(hb_server_t));
207         server->ip = apr_pstrdup(pool, slotserver->ip);
208         server->seen = -1;
209
210         apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
211
212     }
213     server->busy = slotserver->busy;
214     server->ready = slotserver->ready;
215     server->seen = slotserver->seen;
216     server->id = slotserver->id;
217     if (server->busy == 0 && server->ready != 0) {
218         server->ready = server->ready / 4;
219     }
220     return APR_SUCCESS;
221 }
222 static apr_status_t readslot_heartbeats(apr_hash_t *servers,
223                                     apr_pool_t *pool)
224 {
225     storage->doall(hm_serversmem, hm_read, servers, pool);
226     return APR_SUCCESS;
227 }
228
229 /*
230  * Finding a random number in a range. 
231  *      n' = a + n(b-a+1)/(M+1)
232  * where:
233  *      n' = random number in range
234  *      a  = low end of range
235  *      b  = high end of range
236  *      n  = random number of 0..M
237  *      M  = maxint
238  * Algorithm 'borrowed' from PHP's rand() function.
239  */
240 #define RAND_RANGE(__n, __min, __max, __tmax) \
241 (__n) = (__min) + (long) ((double) ((__max) - (__min) + 1.0) * ((__n) / ((__tmax) + 1.0)))
242
243 static apr_status_t random_pick(apr_uint32_t *number,
244                                 apr_uint32_t min,
245                                 apr_uint32_t max)
246 {
247     apr_status_t rv = 
248         apr_generate_random_bytes((void*)number, sizeof(apr_uint32_t));
249
250     if (rv) {
251         return rv;
252     }
253
254     RAND_RANGE(*number, min, max, APR_UINT32_MAX);
255
256     return APR_SUCCESS;
257 }
258
259 static proxy_worker *find_best_hb(proxy_balancer *balancer,
260                                   request_rec *r)
261 {
262     apr_status_t rv;
263     int i;
264     apr_uint32_t openslots = 0;
265     proxy_worker **worker;
266     hb_server_t *server;
267     apr_array_header_t *up_servers;
268     proxy_worker *mycandidate = NULL;
269     apr_pool_t *tpool;
270     apr_hash_t *servers;
271
272     lb_hb_ctx_t *ctx = 
273         ap_get_module_config(r->server->module_config,
274                              &lbmethod_heartbeat_module);
275
276     apr_pool_create(&tpool, r->pool);
277
278     servers = apr_hash_make(tpool);
279
280     if (hm_serversmem)
281         rv = readslot_heartbeats(servers, tpool);
282     else
283         rv = readfile_heartbeats(ctx->path, servers, tpool);
284
285     if (rv) {
286         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
287                       "lb_heartbeat: Unable to read heartbeats at '%s'",
288                       ctx->path);
289         apr_pool_destroy(tpool);
290         return NULL;
291     }
292
293     up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *));
294
295     for (i = 0; i < balancer->workers->nelts; i++) {
296         worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *);
297         server = apr_hash_get(servers, (*worker)->hostname, APR_HASH_KEY_STRING);
298
299         if (!server) {
300             continue;
301         }
302
303         if (!PROXY_WORKER_IS_USABLE(*worker)) {
304             ap_proxy_retry_worker("BALANCER", *worker, r->server);
305         }
306
307         if (PROXY_WORKER_IS_USABLE(*worker)) {
308             server->worker = *worker;
309             if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) {
310                 openslots += server->ready;
311                 APR_ARRAY_PUSH(up_servers, hb_server_t *) = server;
312             }
313         }
314     }
315
316     if (openslots > 0) {
317         apr_uint32_t c = 0;
318         apr_uint32_t pick = 0;;
319
320         rv = random_pick(&pick, 0, openslots);
321
322         if (rv) {
323             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
324                           "lb_heartbeat: failed picking a random number. how random.");
325             apr_pool_destroy(tpool);
326             return NULL;
327         }
328
329         for (i = 0; i < up_servers->nelts; i++) {
330             server = APR_ARRAY_IDX(up_servers, i, hb_server_t *);
331             if (pick > c && pick <= c + server->ready) {
332                 mycandidate = server->worker;
333             }
334
335             c += server->ready;
336         }
337     }
338
339     apr_pool_destroy(tpool);
340
341     return mycandidate;
342 }
343
344 static apr_status_t reset(proxy_balancer *balancer, server_rec *s) {
345         return APR_SUCCESS;
346 }
347
348 static apr_status_t age(proxy_balancer *balancer, server_rec *s) {
349         return APR_SUCCESS;
350 }
351
352 static const proxy_balancer_method heartbeat =
353 {
354     "heartbeat",
355     &find_best_hb,
356     NULL,
357     &reset,
358     &age
359 };
360
361 static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog,
362                           apr_pool_t *ptemp, server_rec *s)
363 {
364     const char *userdata_key = "mod_lbmethod_heartbeat_init";
365     void *data;
366     apr_size_t size;
367     unsigned int num;
368
369     apr_pool_userdata_get(&data, userdata_key, s->process->pool);
370     if (!data) {
371         /* first call do nothing */
372         apr_pool_userdata_set((const void *)1, userdata_key, apr_pool_cleanup_null, s->process->pool);
373         return OK;
374     }
375     storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shared", "0");
376     if (!storage) {
377         ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "ap_lookup_provider %s failed", AP_SLOTMEM_PROVIDER_GROUP);
378         return OK;
379     }
380
381     /* Try to use a slotmem created by mod_heartmonitor */
382     storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p);
383     if (!hm_serversmem) {
384         ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "No slotmem from mod_heartmonitor");
385     } else
386         ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_NOTICE, 0, s, "Using slotmem from mod_heartmonitor");
387
388     return OK;
389 }
390
391 static void register_hooks(apr_pool_t *p)
392 {
393     static const char * const aszPred[]={ "mod_heartmonitor.c", NULL };
394     ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat);
395     ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE);
396 }
397
398 static void *lb_hb_create_config(apr_pool_t *p, server_rec *s)
399 {
400     lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t));
401     
402     ctx->path = ap_server_root_relative(p, "logs/hb.dat");
403     
404     return ctx;
405 }
406
407 static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv)
408 {
409     lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t));
410     lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev;
411     lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv;
412
413     if (overrides->path) {
414         ps->path = apr_pstrdup(p, overrides->path);
415     }
416     else {
417         ps->path = apr_pstrdup(p, base->path);
418     }
419
420     return ps;
421 }
422
423 static const char *cmd_lb_hb_storage(cmd_parms *cmd,
424                                   void *dconf, const char *path)
425 {
426     apr_pool_t *p = cmd->pool;
427     lb_hb_ctx_t *ctx =
428     (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
429                                          &lbmethod_heartbeat_module);
430
431     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
432     
433     if (err != NULL) {
434         return err;
435     }
436
437     ctx->path = ap_server_root_relative(p, path);
438
439     return NULL;
440 }
441
442 static const command_rec cmds[] = {
443     AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF,
444                   "Path to read heartbeat data."),
445     {NULL}
446 };
447
448 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module = {
449     STANDARD20_MODULE_STUFF,
450     NULL,                       /* create per-directory config structure */
451     NULL,                       /* merge per-directory config structures */
452     lb_hb_create_config,        /* create per-server config structure */
453     lb_hb_merge_config,         /* merge per-server config structures */
454     cmds,                       /* command apr_table_t */
455     register_hooks              /* register hooks */
456 };