]> granicus.if.org Git - apache/blob - modules/cluster/mod_heartmonitor.c
07bada1431d6ecd575eb3f9dde39bfbcce0a3b30
[apache] / modules / cluster / mod_heartmonitor.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "httpd.h"
18 #include "http_config.h"
19 #include "http_log.h"
20 #include "http_core.h"
21 #include "http_protocol.h"
22 #include "apr_strings.h"
23 #include "apr_hash.h"
24 #include "apr_time.h"
25 #include "ap_mpm.h"
26 #include "scoreboard.h"
27 #include "mod_watchdog.h"
28 #include "ap_slotmem.h"
29 #include "heartbeat.h"
30
31
32 #ifndef HM_UPDATE_SEC
33 /* How often we update the stats file */
34 /* TODO: Make a runtime config */
35 #define HM_UPDATE_SEC (5)
36 #endif
37
38 #define HM_WATHCHDOG_NAME ("_heartmonitor_")
39
40 static const ap_slotmem_provider_t *storage = NULL;
41 static ap_slotmem_instance_t *slotmem = NULL;
42 static int maxworkers = 0;
43
44 module AP_MODULE_DECLARE_DATA heartmonitor_module;
45
46 typedef struct hm_server_t
47 {
48     const char *ip;
49     int busy;
50     int ready;
51     unsigned int port;
52     apr_time_t seen;
53 } hm_server_t;
54
55 typedef struct hm_ctx_t
56 {
57     int active;
58     const char *storage_path;
59     ap_watchdog_t *watchdog;
60     apr_interval_time_t interval;
61     apr_sockaddr_t *mcast_addr;
62     apr_status_t status;
63     volatile int keep_running;
64     apr_socket_t *sock;
65     apr_pool_t *p;
66     apr_hash_t *servers;
67     server_rec *s;
68 } hm_ctx_t;
69
70 typedef struct hm_slot_server_ctx_t {
71   hm_server_t *s;
72   int found;
73   unsigned int item_id;
74 } hm_slot_server_ctx_t;
75
76 static apr_status_t hm_listen(hm_ctx_t *ctx)
77 {
78     apr_status_t rv;
79
80     rv = apr_socket_create(&ctx->sock, ctx->mcast_addr->family,
81                            SOCK_DGRAM, APR_PROTO_UDP, ctx->p);
82
83     if (rv) {
84         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
85                      "Heartmonitor: Failed to create listening socket.");
86         return rv;
87     }
88
89     rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1);
90     if (rv) {
91         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
92                      "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
93         return rv;
94     }
95
96
97     rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1);
98     if (rv) {
99         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
100                      "Heartmonitor: Failed to set APR_SO_NONBLOCK to 1 on socket.");
101         return rv;
102     }
103
104     rv = apr_socket_bind(ctx->sock, ctx->mcast_addr);
105     if (rv) {
106         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
107                      "Heartmonitor: Failed to bind on socket.");
108         return rv;
109     }
110
111     rv = apr_mcast_join(ctx->sock, ctx->mcast_addr, NULL, NULL);
112
113     if (rv) {
114         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
115                      "Heartmonitor: Failed to join multicast group");
116         return rv;
117     }
118
119     rv = apr_mcast_loopback(ctx->sock, 1);
120     if (rv) {
121         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
122                      "Heartmonitor: Failed to accept localhost mulitcast on socket.");
123         return rv;
124     }
125
126     return APR_SUCCESS;
127 }
128
129 /* XXX: The same exists in mod_lbmethod_heartbeat.c where it is named argstr_to_table */
130 static void qs_to_table(const char *input, apr_table_t *parms,
131                         apr_pool_t *p)
132 {
133     char *key;
134     char *value;
135     char *query_string;
136     char *strtok_state;
137
138     if (input == NULL) {
139         return;
140     }
141
142     query_string = apr_pstrdup(p, input);
143
144     key = apr_strtok(query_string, "&", &strtok_state);
145     while (key) {
146         value = strchr(key, '=');
147         if (value) {
148             *value = '\0';      /* Split the string in two */
149             value++;            /* Skip passed the = */
150         }
151         else {
152             value = "1";
153         }
154         ap_unescape_url(key);
155         ap_unescape_url(value);
156         apr_table_set(parms, key, value);
157         /*
158            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
159            "Found query arg: %s = %s", key, value);
160          */
161         key = apr_strtok(NULL, "&", &strtok_state);
162     }
163 }
164
165
166 #define SEEN_TIMEOUT (30)
167
168 /* Store in the slotmem */
169 static apr_status_t hm_update(void* mem, void *data, apr_pool_t *p)
170 {
171     hm_slot_server_t *old = (hm_slot_server_t *) mem;
172     hm_slot_server_ctx_t *s = (hm_slot_server_ctx_t *) data;
173     hm_server_t *new = s->s;
174     if (strncmp(old->ip, new->ip, MAXIPSIZE)==0) {
175         s->found = 1;
176         old->busy = new->busy;
177         old->ready = new->ready;
178         old->seen = new->seen;
179     }
180     return APR_SUCCESS;
181 }
182 /* Read the id corresponding to the entry in the slotmem */
183 static apr_status_t hm_readid(void* mem, void *data, apr_pool_t *p)
184 {
185     hm_slot_server_t *old = (hm_slot_server_t *) mem;
186     hm_slot_server_ctx_t *s = (hm_slot_server_ctx_t *) data;
187     hm_server_t *new = s->s;
188     if (strncmp(old->ip, new->ip, MAXIPSIZE)==0) {
189         s->found = 1;
190         s->item_id = old->id;
191     }
192     return APR_SUCCESS;
193 }
194 /* update the entry or create it if not existing */
195 static  apr_status_t  hm_slotmem_update_stat(hm_server_t *s, apr_pool_t *pool)
196 {
197     /* We call do_all (to try to update) otherwise grab + put */
198     hm_slot_server_ctx_t ctx;
199     ctx.s = s;
200     ctx.found = 0;
201     storage->doall(slotmem, hm_update, &ctx, pool);
202     if (!ctx.found) {
203         unsigned int i;
204         hm_slot_server_t hmserver;
205         memcpy(hmserver.ip, s->ip, MAXIPSIZE);
206         hmserver.busy = s->busy;
207         hmserver.ready = s->ready;
208         hmserver.seen = s->seen;
209         /* XXX locking for grab() / put() */
210         storage->grab(slotmem, &i);
211         hmserver.id = i;
212         storage->put(slotmem, i, (unsigned char *)&hmserver, sizeof(hmserver));
213     }
214     return APR_SUCCESS;
215 }
216 static  apr_status_t  hm_slotmem_remove_stat(hm_server_t *s, apr_pool_t *pool)
217 {
218     hm_slot_server_ctx_t ctx;
219     ctx.s = s;
220     ctx.found = 0;
221     storage->doall(slotmem, hm_readid, &ctx, pool);
222     if (ctx.found) {
223         storage->release(slotmem, ctx.item_id);
224     }
225     return APR_SUCCESS;
226 }
227 static apr_status_t hm_file_update_stat(hm_ctx_t *ctx, hm_server_t *s, apr_pool_t *pool)
228 {
229     apr_status_t rv;
230     apr_file_t *fp;
231     apr_file_t *fpin;
232     apr_time_t now;
233     apr_time_t fage;
234     apr_finfo_t fi;
235     int updated = 0;
236     char *path = apr_pstrcat(pool, ctx->storage_path, ".tmp.XXXXXX", NULL);
237
238
239     /* TODO: Update stats file (!) */
240     rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, pool);
241
242     if (rv) {
243         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
244                      "Heartmonitor: Unable to open tmp file: %s", path);
245         return rv;
246     }
247     rv = apr_file_open(&fpin, ctx->storage_path, APR_READ|APR_BINARY|APR_BUFFERED,
248                        APR_OS_DEFAULT, pool);
249
250     now = apr_time_now();
251     if (rv == APR_SUCCESS) {
252         char *t;
253         apr_table_t *hbt = apr_table_make(pool, 10);
254         apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
255         apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
256         apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
257         rv = apr_file_info_get(&fi, APR_FINFO_SIZE | APR_FINFO_MTIME, fpin);
258         if (rv) {
259             ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
260                          "Heartmonitor: Unable to read file: %s", ctx->storage_path);
261             return rv;
262         }
263
264         /* Read the file and update the line corresponding to the node */
265         ba = apr_bucket_alloc_create(pool);
266         bb = apr_brigade_create(pool, ba);
267         apr_brigade_insert_file(bb, fpin, 0, fi.size, pool);
268         tmpbb = apr_brigade_create(pool, ba);
269         fage = apr_time_sec(now - fi.mtime);
270         do {
271             char buf[4096];
272             const char *ip;
273             apr_size_t bsize = sizeof(buf);
274             apr_brigade_cleanup(tmpbb);
275             if (APR_BRIGADE_EMPTY(bb)) {
276                 break;
277             } 
278             rv = apr_brigade_split_line(tmpbb, bb,
279                                         APR_BLOCK_READ, sizeof(buf));
280        
281             if (rv) {
282                 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
283                              "Heartmonitor: Unable to read from file: %s", ctx->storage_path);
284                 return rv;
285             }
286
287             apr_brigade_flatten(tmpbb, buf, &bsize);
288             if (bsize == 0) {
289                 break;
290             }
291             buf[bsize - 1] = 0;
292             t = strchr(buf, ' ');
293             if (t) {
294                 ip = apr_pstrndup(pool, buf, t - buf);
295             } else {
296                 ip = NULL;
297             }
298             if (!ip || buf[0] == '#') {
299                 /* copy things we can't process */
300                 apr_file_printf(fp, "%s\n", buf);
301             } else if (strcmp(ip, s->ip) !=0 ) {
302                 hm_server_t node; 
303                 apr_time_t seen;
304                 /* Update seen time according to the last file modification */
305                 apr_table_clear(hbt);
306                 qs_to_table(apr_pstrdup(pool, t), hbt, pool);
307                 if (apr_table_get(hbt, "busy")) {
308                     node.busy = atoi(apr_table_get(hbt, "busy"));
309                 } else {
310                     node.busy = 0;
311                 }
312
313                 if (apr_table_get(hbt, "ready")) {
314                     node.ready = atoi(apr_table_get(hbt, "ready"));
315                 } else {
316                     node.ready = 0;
317                 }
318
319                 if (apr_table_get(hbt, "lastseen")) {
320                     node.seen = atoi(apr_table_get(hbt, "lastseen"));
321                 } else {
322                     node.seen = SEEN_TIMEOUT; 
323                 }
324                 seen = fage + node.seen;
325
326                 if (apr_table_get(hbt, "port")) {
327                     node.port = atoi(apr_table_get(hbt, "port"));
328                 } else {
329                     node.port = 80; 
330                 }
331                 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
332                                 ip, node.ready, node.busy, (unsigned int) seen, node.port);
333             } else {
334                 apr_time_t seen;
335                 seen = apr_time_sec(now - s->seen);
336                 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
337                                 s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
338                 updated = 1;
339             }
340         } while (1);
341     }
342
343     if (!updated) {
344         apr_time_t seen;
345         seen = apr_time_sec(now - s->seen);
346         apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
347                         s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
348     }
349
350     rv = apr_file_flush(fp);
351     if (rv) {
352       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
353                    "Heartmonitor: Unable to flush file: %s", path);
354       return rv;
355     }
356
357     rv = apr_file_close(fp);
358     if (rv) {
359       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
360                    "Heartmonitor: Unable to close file: %s", path);
361       return rv;
362     }
363   
364     rv = apr_file_perms_set(path,
365                             APR_FPROT_UREAD | APR_FPROT_GREAD |
366                             APR_FPROT_WREAD);
367     if (rv && rv != APR_INCOMPLETE) {
368         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
369                      "Heartmonitor: Unable to set file permssions on %s",
370                      path);
371         return rv;
372     }
373
374     rv = apr_file_rename(path, ctx->storage_path, pool);
375
376     if (rv) {
377         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
378                      "Heartmonitor: Unable to move file: %s -> %s", path,
379                      ctx->storage_path);
380         return rv;
381     }
382
383     return APR_SUCCESS;
384 }
385 static  apr_status_t  hm_update_stat(hm_ctx_t *ctx, hm_server_t *s, apr_pool_t *pool)
386 {
387     if (slotmem)
388         return hm_slotmem_update_stat(s, pool);
389     else
390         return hm_file_update_stat(ctx, s, pool);
391 }
392
393 /* Store in a file */
394 static apr_status_t hm_file_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
395 {
396     apr_status_t rv;
397     apr_file_t *fp;
398     apr_hash_index_t *hi;
399     apr_time_t now;
400     char *path = apr_pstrcat(p, ctx->storage_path, ".tmp.XXXXXX", NULL);
401     /* TODO: Update stats file (!) */
402     rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, p);
403
404     if (rv) {
405         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
406                      "Heartmonitor: Unable to open tmp file: %s", path);
407         return rv;
408     }
409
410     now = apr_time_now();
411     for (hi = apr_hash_first(p, ctx->servers);
412          hi != NULL; hi = apr_hash_next(hi)) {
413         hm_server_t *s = NULL;
414         apr_time_t seen;
415         apr_hash_this(hi, NULL, NULL, (void **) &s);
416         seen = apr_time_sec(now - s->seen);
417         if (seen > SEEN_TIMEOUT) {
418             /*
419              * Skip this entry from the heartbeat file -- when it comes back,
420              * we will reuse the memory...
421              */
422         }
423         else {
424             apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
425                             s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
426         }
427     }
428
429     rv = apr_file_flush(fp);
430     if (rv) {
431       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
432                    "Heartmonitor: Unable to flush file: %s", path);
433       return rv;
434     }
435
436     rv = apr_file_close(fp);
437     if (rv) {
438       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
439                    "Heartmonitor: Unable to close file: %s", path);
440       return rv;
441     }
442   
443     rv = apr_file_perms_set(path,
444                             APR_FPROT_UREAD | APR_FPROT_GREAD |
445                             APR_FPROT_WREAD);
446     if (rv && rv != APR_INCOMPLETE) {
447         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
448                      "Heartmonitor: Unable to set file permssions on %s",
449                      path);
450         return rv;
451     }
452
453     rv = apr_file_rename(path, ctx->storage_path, p);
454
455     if (rv) {
456         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
457                      "Heartmonitor: Unable to move file: %s -> %s", path,
458                      ctx->storage_path);
459         return rv;
460     }
461
462     return APR_SUCCESS;
463 }
464 /* Store in a slotmem */
465 static apr_status_t hm_slotmem_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
466 {
467     apr_status_t rv;
468     apr_time_t now;
469     apr_hash_index_t *hi;
470     now = apr_time_now();
471     for (hi = apr_hash_first(p, ctx->servers);
472          hi != NULL; hi = apr_hash_next(hi)) {
473         hm_server_t *s = NULL;
474         apr_time_t seen;
475         apr_hash_this(hi, NULL, NULL, (void **) &s);
476         seen = apr_time_sec(now - s->seen);
477         if (seen > SEEN_TIMEOUT) {
478             /* remove it */
479             rv = hm_slotmem_remove_stat(s, p);
480         } else {
481             /* update it */
482             rv = hm_slotmem_update_stat(s, p);
483         }
484         if (rv !=APR_SUCCESS)
485             return rv;
486     }
487     return APR_SUCCESS;
488 }
489 /* Store/update the stats */
490 static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
491 {
492     if (slotmem)
493         return hm_slotmem_update_stats(ctx, p);
494     else
495         return hm_file_update_stats(ctx, p);
496 }
497
498 static hm_server_t *hm_get_server(hm_ctx_t *ctx, const char *ip, const int port)
499 {
500     hm_server_t *s;
501
502     s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
503
504     if (s == NULL) {
505         s = apr_palloc(ctx->p, sizeof(hm_server_t));
506         s->ip = apr_pstrdup(ctx->p, ip);
507         s->port = port;
508         s->ready = 0;
509         s->busy = 0;
510         s->seen = 0;
511         apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
512     }
513
514     return s;
515 }
516
517 /* Process a message received from a backend node */
518 static void hm_processmsg(hm_ctx_t *ctx, apr_pool_t *p,
519                                   apr_sockaddr_t *from, char *buf, int len)
520 {
521     apr_table_t *tbl;
522
523     buf[len] = '\0';
524
525     tbl = apr_table_make(p, 10);
526
527     qs_to_table(buf, tbl, p);
528
529     if (apr_table_get(tbl, "v") != NULL &&
530         apr_table_get(tbl, "busy") != NULL &&
531         apr_table_get(tbl, "ready") != NULL) {
532         char *ip;
533         int port = 80;
534         hm_server_t *s;
535         /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
536         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s,
537                      "Heartmonitor: %pI busy=%s ready=%s", from,
538                      apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
539
540         apr_sockaddr_ip_get(&ip, from);
541
542         if (apr_table_get(tbl, "port") != NULL)
543             port = atoi(apr_table_get(tbl, "port"));
544            
545         s = hm_get_server(ctx, ip, port);
546
547         s->busy = atoi(apr_table_get(tbl, "busy"));
548         s->ready = atoi(apr_table_get(tbl, "ready"));
549         s->seen = apr_time_now();
550     }
551     else {
552         ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ctx->s,
553                      "Heartmonitor: malformed message from %pI",
554                      from);
555     }
556
557 }
558 /* Read message from multicast socket */
559 #define MAX_MSG_LEN (1000)
560 static apr_status_t hm_recv(hm_ctx_t *ctx, apr_pool_t *p)
561 {
562     char buf[MAX_MSG_LEN + 1];
563     apr_sockaddr_t from;
564     apr_size_t len = MAX_MSG_LEN;
565     apr_status_t rv;
566
567     from.pool = p;
568
569     rv = apr_socket_recvfrom(&from, ctx->sock, 0, buf, &len);
570
571     if (APR_STATUS_IS_EAGAIN(rv)) {
572         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
573                      "Heartmonitor: would block");
574         return APR_SUCCESS;
575     }
576     else if (rv) {
577         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
578                      "Heartmonitor: recvfrom failed");
579         return rv;
580     }
581
582     hm_processmsg(ctx, p, &from, buf, len);
583
584     return rv;
585 }
586
587 static apr_status_t hm_watchdog_callback(int state, void *data,
588                                          apr_pool_t *pool)
589 {
590     apr_status_t rv = APR_SUCCESS;
591     apr_time_t cur, now;
592     hm_ctx_t *ctx = (hm_ctx_t *)data;
593
594     if (!ctx->active) {
595         return rv;
596     }
597
598     switch (state) {
599         case AP_WATCHDOG_STATE_STARTING:
600             rv = hm_listen(ctx);
601             if (rv) {
602                 ctx->status = rv;
603                 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s,
604                              "Heartmonitor: Unable to listen for connections!");
605             }
606             else {
607                 ctx->keep_running = 1;
608                 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s,
609                              "Heartmonitor: %s listener started.",
610                              HM_WATHCHDOG_NAME);
611             }
612         break;
613         case AP_WATCHDOG_STATE_RUNNING:
614             /* store in the slotmem or in the file depending on configuration */
615             hm_update_stats(ctx, pool);
616             cur = now = apr_time_sec(apr_time_now());
617             /* TODO: Insted HN_UPDATE_SEC use
618              * the ctx->interval
619              */
620             while ((now - cur) < apr_time_sec(ctx->interval)) {
621                 int n;
622                 apr_status_t rc;
623                 apr_pool_t *p;
624                 apr_pollfd_t pfd;
625                 apr_interval_time_t timeout;
626
627                 apr_pool_create(&p, pool);
628
629                 pfd.desc_type = APR_POLL_SOCKET;
630                 pfd.desc.s = ctx->sock;
631                 pfd.p = p;
632                 pfd.reqevents = APR_POLLIN;
633
634                 timeout = apr_time_from_sec(1);
635
636                 rc = apr_poll(&pfd, 1, &n, timeout);
637
638                 if (!ctx->keep_running) {
639                     apr_pool_destroy(p);
640                     break;
641                 }
642                 if (rc == APR_SUCCESS && (pfd.rtnevents & APR_POLLIN)) {
643                     hm_recv(ctx, p);
644                 }
645                 now = apr_time_sec(apr_time_now());
646                 apr_pool_destroy(p);
647             }
648         break;
649         case AP_WATCHDOG_STATE_STOPPING:
650             ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s,
651                          "Heartmonitor: stopping %s listener.",
652                          HM_WATHCHDOG_NAME);
653
654             ctx->keep_running = 0;
655             if (ctx->sock) {
656                 apr_socket_close(ctx->sock);
657                 ctx->sock = NULL;
658             }
659         break;
660     }
661     return rv;
662 }
663
664 static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
665                           apr_pool_t *ptemp, server_rec *s)
666 {
667     apr_status_t rv;
668     const char *userdata_key = "mod_heartmonitor_init";
669     void *data;
670     hm_ctx_t *ctx = ap_get_module_config(s->module_config,
671                                          &heartmonitor_module);
672     APR_OPTIONAL_FN_TYPE(ap_watchdog_get_instance) *hm_watchdog_get_instance;
673     APR_OPTIONAL_FN_TYPE(ap_watchdog_register_callback) *hm_watchdog_register_callback;
674
675     hm_watchdog_get_instance = APR_RETRIEVE_OPTIONAL_FN(ap_watchdog_get_instance);
676     hm_watchdog_register_callback = APR_RETRIEVE_OPTIONAL_FN(ap_watchdog_register_callback);
677     if (!hm_watchdog_get_instance || !hm_watchdog_register_callback) {
678         ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
679                      "Heartmonitor: mod_watchdog is required");
680         return !OK;
681     }
682
683     /* Create the slotmem */
684     apr_pool_userdata_get(&data, userdata_key, s->process->pool);
685     if (!data) {
686         /* first call do nothing */
687         apr_pool_userdata_set((const void *)1, userdata_key, apr_pool_cleanup_null, s->process->pool);
688     } else {
689         if (maxworkers) {
690             storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shared", "0");
691             if (!storage) {
692                 ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_EMERG, 0, s, "ap_lookup_provider %s failed", AP_SLOTMEM_PROVIDER_GROUP);
693                 return !OK;
694             }
695             storage->create(&slotmem, "mod_heartmonitor", sizeof(hm_slot_server_t), maxworkers, AP_SLOTMEM_TYPE_PREGRAB, p);
696             if (!slotmem) {
697                 ap_log_error(APLOG_MARK, APLOG_NOERRNO|APLOG_EMERG, 0, s, "slotmem_create for status failed");
698                 return !OK;
699             }
700         }
701     }
702
703     if (!ctx->active) {
704         return OK;
705     }
706     rv = hm_watchdog_get_instance(&ctx->watchdog,
707                                   HM_WATHCHDOG_NAME,
708                                   0, 1, p);
709     if (rv) {
710         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
711                      "Heartmonitor: Failed to create watchdog "
712                      "instance (%s)", HM_WATHCHDOG_NAME);
713         return !OK;
714     }
715     /* Register a callback with zero interval. */
716     rv = hm_watchdog_register_callback(ctx->watchdog,
717                                        0,
718                                        ctx,
719                                        hm_watchdog_callback);
720     if (rv) {
721         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
722                      "Heartmonitor: Failed to register watchdog "
723                      "callback (%s)", HM_WATHCHDOG_NAME);
724         return !OK;
725     }
726     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s,
727                  "Heartmonitor: wd callback %s", HM_WATHCHDOG_NAME);
728     return OK;
729 }
730
731 static int hm_handler(request_rec *r)
732 {
733     apr_bucket_brigade *input_brigade;
734     apr_size_t len;
735     char *buf;
736     apr_status_t status;
737     apr_table_t *tbl;
738     hm_server_t hmserver;
739     char *ip;
740     hm_ctx_t *ctx;
741
742     if (strcmp(r->handler, "heartbeat")) {
743         return DECLINED;
744     }
745     if (r->method_number != M_POST) {
746         return HTTP_METHOD_NOT_ALLOWED;
747     }
748
749     len = MAX_MSG_LEN;
750     ctx = ap_get_module_config(r->server->module_config,
751             &heartmonitor_module);
752
753     buf = apr_pcalloc(r->pool, MAX_MSG_LEN);
754     input_brigade = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
755     status = ap_get_brigade(r->input_filters, input_brigade, AP_MODE_READBYTES, APR_BLOCK_READ, MAX_MSG_LEN);
756     if (status != APR_SUCCESS) {
757         return HTTP_INTERNAL_SERVER_ERROR;
758     }
759     apr_brigade_flatten(input_brigade, buf, &len);
760
761     /* we can't use hm_processmsg because it uses hm_get_server() */
762     buf[len] = '\0';
763     tbl = apr_table_make(r->pool, 10);
764     qs_to_table(buf, tbl, r->pool);
765     apr_sockaddr_ip_get(&ip, r->connection->remote_addr);
766     hmserver.ip = ip;
767     hmserver.port = 80;
768     if (apr_table_get(tbl, "port") != NULL)
769         hmserver.port = atoi(apr_table_get(tbl, "port"));
770     hmserver.busy = atoi(apr_table_get(tbl, "busy"));
771     hmserver.ready = atoi(apr_table_get(tbl, "ready"));
772     hmserver.seen = apr_time_now();
773     hm_update_stat(ctx, &hmserver, r->pool);
774
775     ap_set_content_type(r, "text/plain");
776     ap_set_content_length(r, 2);
777     ap_rprintf(r, "OK");
778     ap_rflush(r);
779
780     return OK;
781 }
782
783 static void hm_register_hooks(apr_pool_t *p)
784 {
785     static const char * const aszSucc[]={ "mod_proxy.c", NULL };
786     ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
787
788     ap_hook_handler(hm_handler, NULL, aszSucc, APR_HOOK_FIRST);
789 }
790
791 static void *hm_create_config(apr_pool_t *p, server_rec *s)
792 {
793     hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
794
795     ctx->active = 0;
796     ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat");
797     /* TODO: Add directive for tuning the update interval
798      */
799     ctx->interval = apr_time_from_sec(HM_UPDATE_SEC);
800     ctx->s = s;
801     apr_pool_create(&ctx->p, p);
802     ctx->servers = apr_hash_make(ctx->p);
803
804     return ctx;
805 }
806
807 static const char *cmd_hm_storage(cmd_parms *cmd,
808                                   void *dconf, const char *path)
809 {
810     apr_pool_t *p = cmd->pool;
811     hm_ctx_t *ctx =
812         (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
813                                           &heartmonitor_module);
814     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
815
816     if (err != NULL) {
817         return err;
818     }
819
820     ctx->storage_path = ap_server_root_relative(p, path);
821
822     return NULL;
823 }
824
825 static const char *cmd_hm_listen(cmd_parms *cmd,
826                                  void *dconf, const char *mcast_addr)
827 {
828     apr_status_t rv;
829     char *host_str;
830     char *scope_id;
831     apr_port_t port = 0;
832     apr_pool_t *p = cmd->pool;
833     hm_ctx_t *ctx =
834         (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
835                                           &heartmonitor_module);
836     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
837
838     if (err != NULL) {
839         return err;
840     }
841
842     if (!ctx->active) {
843         ctx->active = 1;
844     }
845     else {
846         return "HeartbeatListen: May only be specified once.";
847     }
848
849     rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, cmd->temp_pool);
850
851     if (rv) {
852         return "HeartbeatListen: Unable to parse multicast address.";
853     }
854
855     if (host_str == NULL) {
856         return "HeartbeatListen: No host provided in multicast address";
857     }
858
859     if (port == 0) {
860         return "HeartbeatListen: No port provided in multicast address";
861     }
862
863     rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
864                                p);
865
866     if (rv) {
867         return
868             "HeartbeatListen: apr_sockaddr_info_get failed on multicast address";
869     }
870
871     return NULL;
872 }
873
874 static const char *cmd_hm_maxworkers(cmd_parms *cmd,
875                                   void *dconf, const char *data)
876 {
877     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
878
879     if (err != NULL) {
880         return err;
881     }
882
883     maxworkers = atoi(data);
884     if (maxworkers <= 10)
885         return "HeartbeatMaxServers: Should be bigger than 10"; 
886
887     return NULL;
888 }
889
890 static const command_rec hm_cmds[] = {
891     AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF,
892                   "Address to listen for heartbeat requests"),
893     AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF,
894                   "Path to store heartbeat data."),
895     AP_INIT_TAKE1("HeartbeatMaxServers", cmd_hm_maxworkers, NULL, RSRC_CONF,
896                   "Max number of servers when using slotmem (instead file) to store heartbeat data."),
897     {NULL}
898 };
899
900 AP_DECLARE_MODULE(heartmonitor) = {
901     STANDARD20_MODULE_STUFF,
902     NULL,                       /* create per-directory config structure */
903     NULL,                       /* merge per-directory config structures */
904     hm_create_config,           /* create per-server config structure */
905     NULL,                       /* merge per-server config structures */
906     hm_cmds,                    /* command apr_table_t */
907     hm_register_hooks
908 };