]> granicus.if.org Git - apache/blob - modules/proxy/mod_serf.c
spelling
[apache] / modules / proxy / mod_serf.c
1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "mod_serf.h"
18
19 #include "httpd.h"
20 #include "http_core.h"
21 #include "http_config.h"
22 #include "http_protocol.h"
23 #include "http_request.h"
24 #include "http_log.h"
25
26 #include "serf.h"
27 #include "apr_uri.h"
28 #include "apr_strings.h"
29 #include "apr_version.h"
30 #include "ap_mpm.h"
31
32 module AP_MODULE_DECLARE_DATA serf_module;
33 static int mpm_supprts_serf = 0;
34
35 typedef struct {
36     int on;
37     int preservehost;
38     apr_uri_t url;
39 } serf_config_t;
40
41 typedef struct {
42   const char *name;
43   const char *provider;
44   apr_table_t *params;
45 } serf_cluster_t;
46
47 typedef struct {
48   /* name -> serf_cluster_t* */
49   apr_hash_t *clusters;
50 } serf_server_config_t;
51
52 typedef struct {
53     int rstatus;
54     int want_ssl;
55     int done_headers;
56     int keep_reading;
57     request_rec *r;
58     apr_pool_t *serf_pool;
59     apr_bucket_brigade *tmpbb;
60     serf_config_t *conf;
61     serf_ssl_context_t *ssl_ctx;
62     serf_bucket_alloc_t *bkt_alloc;
63     serf_bucket_t *body_bkt;
64 } s_baton_t;
65
66 #if !APR_VERSION_AT_LEAST(1,4,0)
67 #define apr_time_from_msec(x) (x * 1000)
68 #endif
69
70 /**
71  * This works right now because all timers are invoked in the single listener
72  * thread in the Event MPM -- the same thread that serf callbacks are made
73  * from, so we don't technically need a mutex yet, but with the Simple MPM,
74  * invocations are made from worker threads, and we need to figure out locking
75  */
76 static void timed_cleanup_callback(void *baton)
77 {
78     s_baton_t *ctx = baton;
79
80     /* Causes all serf connections to unregister from the event mpm */
81     if (ctx->rstatus) {
82         ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r, APLOGNO(01119)
83                       "serf: request returned: %d", ctx->rstatus);
84         ctx->r->status = HTTP_OK;
85         apr_pool_destroy(ctx->serf_pool);
86         ap_die(ctx->rstatus, ctx->r);
87     }
88     else {
89         apr_bucket *e;
90         apr_brigade_cleanup(ctx->tmpbb);
91         e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc);
92         APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
93         e = apr_bucket_eos_create(ctx->r->connection->bucket_alloc);
94         APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
95
96         /* TODO: return code? bleh */
97         ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
98
99         apr_pool_destroy(ctx->serf_pool);
100
101         ap_finalize_request_protocol(ctx->r);
102         ap_process_request_after_handler(ctx->r);
103         return;
104     }
105 }
106
107 static void closed_connection(serf_connection_t *conn,
108                               void *closed_baton,
109                               apr_status_t why,
110                               apr_pool_t *pool)
111 {
112     s_baton_t *ctx = closed_baton;
113
114     if (why) {
115         /* justin says that error handling isn't done yet. hah. */
116         /* XXXXXX: review */
117         ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, APLOGNO(01120) "Closed Connection Error");
118         ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
119     }
120
121     if (mpm_supprts_serf) {
122         ap_mpm_register_timed_callback(apr_time_from_msec(1),
123                                        timed_cleanup_callback, ctx);
124     }
125     ctx->keep_reading = 0;
126 }
127
128 static serf_bucket_t* conn_setup(apr_socket_t *sock,
129                                  void *setup_baton,
130                                  apr_pool_t *pool)
131 {
132     serf_bucket_t *c;
133     s_baton_t *ctx = setup_baton;
134
135     c = serf_bucket_socket_create(sock, ctx->bkt_alloc);
136     if (ctx->want_ssl) {
137         c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
138     }
139
140     return c;
141 }
142
143 static int copy_headers_in(void *vbaton, const char *key, const char *value)
144 {
145     serf_bucket_t *hdrs_bkt = (serf_bucket_t *)vbaton;
146
147     /* XXXXX: List of headers not to copy to serf. serf's serf_bucket_headers_setn,
148      * doesn't actually overwrite a header if we set it once, so we need to ignore anything
149      * we might want to toggle or combine.
150      */
151     switch (key[0]) {
152     case 'a':
153     case 'A':
154         if (strcasecmp("Accept-Encoding", key) == 0) {
155             return 0;
156         }
157         break;
158     case 'c':
159     case 'C':
160         if (strcasecmp("Connection", key) == 0) {
161             return 0;
162         }
163         break;
164     case 'h':
165     case 'H':
166         if (strcasecmp("Host", key) == 0) {
167             return 0;
168         }
169         break;
170     case 'k':
171     case 'K':
172         if (strcasecmp("Keep-Alive", key) == 0) {
173             return 0;
174         }
175         break;
176     case 't':
177     case 'T':
178         if (strcasecmp("TE", key) == 0) {
179             return 0;
180         }
181         if (strcasecmp("Trailer", key) == 0) {
182             return 0;
183         }
184         break;
185     case 'u':
186     case 'U':
187         if (strcasecmp("Upgrade", key) == 0) {
188             return 0;
189         }
190         break;
191     default:
192         break;
193     }
194
195     serf_bucket_headers_setn(hdrs_bkt, key, value);
196     return 0;
197 }
198
199 static int copy_headers_out(void *vbaton, const char *key, const char *value)
200 {
201     s_baton_t *ctx = vbaton;
202     int done = 0;
203
204     /* XXXXX: Special Treatment required for MANY other headers. fixme.*/
205     switch (key[0]) {
206     case 'c':
207     case 'C':
208         if (strcasecmp("Content-Type", key) == 0) {
209             ap_set_content_type(ctx->r, value);
210             done = 1;
211             break;
212         }
213         else if (strcasecmp("Connection", key) == 0) {
214             done = 1;
215             break;
216         }
217         else if (strcasecmp("Content-Encoding", key) == 0) {
218             done = 1;
219             break;
220         }
221         else if (strcasecmp("Content-Length", key) == 0) {
222             done = 1;
223             break;
224         }
225         break;
226     case 't':
227     case 'T':
228         if (strcasecmp("Transfer-Encoding", key) == 0) {
229             done = 1;
230             break;
231         }
232         break;
233     default:
234             break;
235     }
236
237     if (!done) {
238         apr_table_addn(ctx->r->headers_out, key, value);
239     }
240
241     return 0;
242 }
243
244 static serf_bucket_t* accept_response(serf_request_t *request,
245                                       serf_bucket_t *stream,
246                                       void *acceptor_baton,
247                                       apr_pool_t *pool)
248 {
249     serf_bucket_t *c;
250     serf_bucket_alloc_t *bkt_alloc;
251
252     /* get the per-request bucket allocator */
253     bkt_alloc = serf_request_get_alloc(request);
254
255     /* Create a barrier so the response doesn't eat us! */
256     c = serf_bucket_barrier_create(stream, bkt_alloc);
257
258     return serf_bucket_response_create(c, bkt_alloc);
259 }
260
261 static apr_status_t handle_response(serf_request_t *request,
262                                     serf_bucket_t *response,
263                                     void *vbaton,
264                                     apr_pool_t *pool)
265 {
266     apr_status_t rv;
267     s_baton_t *ctx = vbaton;
268     const char *data;
269     apr_size_t len;
270     serf_status_line sl;
271
272     if (response == NULL) {
273         ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
274         return APR_EGENERAL;
275     }
276
277     /* XXXXXXX: Create better error message. */
278     rv = serf_bucket_response_status(response, &sl);
279     if (rv) {
280         if (APR_STATUS_IS_EAGAIN(rv)) {
281             return APR_SUCCESS;
282         }
283
284         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, ctx->r, APLOGNO(01121) "serf_bucket_response_status...");
285
286         ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
287
288         if (mpm_supprts_serf) {
289             ap_mpm_register_timed_callback(apr_time_from_msec(1),
290                                            timed_cleanup_callback, ctx);
291         }
292
293         return rv;
294     }
295
296     /**
297      * XXXXX: If I understood serf buckets better, it might be possible to not
298      * copy all of the data here, and better stream it to the client.
299      **/
300
301     do {
302         apr_brigade_cleanup(ctx->tmpbb);
303         rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len);
304
305         if (SERF_BUCKET_READ_ERROR(rv)) {
306             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, ctx->r, APLOGNO(01122) "serf_bucket_read(response)");
307             return rv;
308         }
309
310         if (!ctx->done_headers) {
311             serf_bucket_t *hdrs;
312             serf_status_line line;
313
314             /* TODO: improve */
315             serf_bucket_response_status(response, &line);
316             ctx->r->status = line.code;
317
318             hdrs = serf_bucket_response_get_headers(response);
319             serf_bucket_headers_do(hdrs, copy_headers_out, ctx);
320             ctx->done_headers = 1;
321         }
322
323
324         if (len > 0) {
325             /* TODO: make APR bucket <-> serf bucket stuff more magical. */
326             apr_brigade_write(ctx->tmpbb, NULL, NULL, data, len);
327         }
328
329         if (APR_STATUS_IS_EOF(rv)) {
330             ctx->keep_reading = 0;
331
332             ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
333
334             if (mpm_supprts_serf) {
335                 ap_mpm_register_timed_callback(apr_time_from_msec(1),
336                                                timed_cleanup_callback, ctx);
337             }
338             return APR_EOF;
339         }
340
341         ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
342
343         /* XXXX: Should we send a flush now? */
344         if (APR_STATUS_IS_EAGAIN(rv)) {
345             return APR_SUCCESS;
346         }
347
348     } while (1);
349 }
350
351
352 static apr_status_t setup_request(serf_request_t *request,
353                                   void *vbaton,
354                                   serf_bucket_t **req_bkt,
355                                   serf_response_acceptor_t *acceptor,
356                                   void **acceptor_baton,
357                                   serf_response_handler_t *handler,
358                                   void **handler_baton,
359                                   apr_pool_t *pool)
360 {
361     s_baton_t *ctx = vbaton;
362     serf_bucket_t *hdrs_bkt;
363
364     *req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri,
365                                           ctx->body_bkt,
366                                           serf_request_get_alloc(request));
367
368     hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
369
370     apr_table_do(copy_headers_in, hdrs_bkt, ctx->r->headers_in, NULL);
371
372     if (ctx->conf->preservehost) {
373         serf_bucket_headers_setn(hdrs_bkt, "Host",
374                                  apr_table_get(ctx->r->headers_in, "Host"));
375     }
376     else {
377         serf_bucket_headers_setn(hdrs_bkt, "Host", ctx->conf->url.hostname);
378     }
379
380     serf_bucket_headers_setn(hdrs_bkt, "Accept-Encoding", "gzip");
381
382     if (ctx->want_ssl) {
383         if (ctx->ssl_ctx == NULL) {
384             *req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, NULL,
385                                            ctx->bkt_alloc);
386             ctx->ssl_ctx = serf_bucket_ssl_encrypt_context_get(*req_bkt);
387         }
388         else {
389             *req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, ctx->ssl_ctx,
390                                                       ctx->bkt_alloc);
391         }
392     }
393
394     *acceptor = accept_response;
395     *acceptor_baton = ctx;
396     *handler = handle_response;
397     *handler_baton = ctx;
398
399     return APR_SUCCESS;
400 }
401
402 /* TOOD: rewrite drive_serf to make it async */
403 static int drive_serf(request_rec *r, serf_config_t *conf)
404 {
405     apr_status_t rv = 0;
406     apr_pool_t *pool;
407     apr_sockaddr_t *address;
408     s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t));
409     /* XXXXX: make persistent/per-process or something.*/
410     serf_context_t *serfme;
411     serf_connection_t *conn;
412     serf_server_config_t *ctx =
413         (serf_server_config_t *)ap_get_module_config(r->server->module_config,
414                                                      &serf_module);
415
416     /* Allocate everything out of a subpool, with a shorter lifetime than
417      * the main request, so that we can cleanup safely and remove our events
418      * from the main serf context in the async mpm mode.
419      */
420     apr_pool_create(&pool, r->pool);
421     if (strcmp(conf->url.scheme, "cluster") == 0) {
422         int rc;
423         ap_serf_cluster_provider_t *cp;
424         serf_cluster_t *cluster;
425         apr_array_header_t *servers = NULL;
426         apr_uint32_t pick = 0;
427         ap_serf_server_t *choice;
428
429         /* TODO: could this be optimized in post-config to pre-setup the
430          * pointers to the right cluster inside the conf structure?
431          */
432         cluster = apr_hash_get(ctx->clusters,
433                                conf->url.hostname,
434                                APR_HASH_KEY_STRING);
435         if (!cluster) {
436             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01123)
437                           "SerfCluster: unable to find cluster %s", conf->url.hostname);
438             return HTTP_INTERNAL_SERVER_ERROR;
439         }
440
441         cp = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
442
443         if (cp == NULL) {
444             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01124)
445                           "SerfCluster: unable to find provider %s", cluster->provider);
446             return HTTP_INTERNAL_SERVER_ERROR;
447         }
448
449         if (cp->list_servers == NULL) {
450             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01125)
451                           "SerfCluster: %s is missing list servers provider.", cluster->provider);
452             return HTTP_INTERNAL_SERVER_ERROR;
453         }
454
455         rc = cp->list_servers(cp->baton,
456                               r,
457                               cluster->params,
458                               &servers);
459
460         if (rc != OK) {
461             ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r, APLOGNO(01126)
462                           "SerfCluster: %s list servers returned failure", cluster->provider);
463             return HTTP_INTERNAL_SERVER_ERROR;
464         }
465
466         if (servers == NULL || apr_is_empty_array(servers)) {
467             ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r, APLOGNO(01127)
468                           "SerfCluster: %s failed to provide a list of servers", cluster->provider);
469             return HTTP_INTERNAL_SERVER_ERROR;
470         }
471
472         /* TOOD: restructure try all servers in the array !! */
473         pick = ap_random_pick(0, servers->nelts-1);
474         choice = APR_ARRAY_IDX(servers, pick, ap_serf_server_t *);
475
476         rv = apr_sockaddr_info_get(&address, choice->ip,
477                                    APR_UNSPEC, choice->port, 0,
478                                    pool);
479     }
480     else {
481         /* XXXXX: cache dns? */
482         rv = apr_sockaddr_info_get(&address, conf->url.hostname,
483                                    APR_UNSPEC, conf->url.port, 0,
484                                    pool);
485     }
486
487     if (rv != APR_SUCCESS) {
488         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01128) "Unable to resolve: %s", conf->url.hostname);
489         return HTTP_INTERNAL_SERVER_ERROR;
490     }
491
492     if (mpm_supprts_serf) {
493         serfme = ap_lookup_provider("mpm_serf", "instance", "0");
494         if (!serfme) {
495             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01129) "mpm lied to us about supporting serf.");
496             return HTTP_INTERNAL_SERVER_ERROR;
497         }
498     }
499     else {
500         serfme = serf_context_create(pool);
501     }
502
503     baton->r = r;
504     baton->conf = conf;
505     baton->serf_pool = pool;
506     baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
507     baton->body_bkt = NULL;
508     baton->ssl_ctx = NULL;
509     baton->rstatus = OK;
510
511     baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
512     baton->done_headers = 0;
513     baton->keep_reading = 1;
514
515     if (strcasecmp(conf->url.scheme, "https") == 0) {
516         baton->want_ssl = 1;
517     }
518     else {
519         baton->want_ssl = 0;
520     }
521
522     rv = ap_setup_client_block(baton->r, REQUEST_CHUNKED_DECHUNK);
523     if (rv) {
524         return rv;
525     }
526
527     /* TODO: create custom serf bucket, which does async request body reads */
528     if (ap_should_client_block(r)) {
529         apr_size_t len;
530         apr_off_t flen = 0;
531         char buf[AP_IOBUFSIZE];
532         apr_file_t *fp;
533
534         rv = apr_file_mktemp(&fp, "mod_serf_buffer.XXXXXX", 0, pool);
535         if (rv != APR_SUCCESS) {
536             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01130) "Unable to create temp request body buffer file.");
537             return HTTP_INTERNAL_SERVER_ERROR;
538         }
539
540         do {
541             len = sizeof(buf);
542             rv = ap_get_client_block(baton->r, buf, len);
543             if (rv > 0) {
544                 rv = apr_file_write_full(fp, buf, rv, NULL);
545                 if (rv) {
546                     ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01131) "failed to read request body");
547                     return HTTP_INTERNAL_SERVER_ERROR;
548                 }
549             }
550         } while(rv > 0);
551
552         if (rv < 0) {
553             ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01132) "failed to read request body");
554             return HTTP_INTERNAL_SERVER_ERROR;
555         }
556
557         apr_file_seek(fp, APR_SET, &flen);
558         baton->body_bkt = serf_bucket_file_create(fp, baton->bkt_alloc);
559     }
560
561     conn = serf_connection_create(serfme, address,
562                                   conn_setup, baton,
563                                   closed_connection, baton,
564                                   pool);
565
566     /* XXX: Is it correct that we don't use the returned serf_request_t? */
567     serf_connection_request_create(conn, setup_request, baton);
568
569     if (mpm_supprts_serf) {
570         return SUSPENDED;
571     }
572     else {
573         do {
574             rv = serf_context_run(serfme, SERF_DURATION_FOREVER, pool);
575
576             /* XXXX: Handle timeouts */
577             if (APR_STATUS_IS_TIMEUP(rv)) {
578                 continue;
579             }
580
581             if (rv != APR_SUCCESS) {
582                 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01133) "serf_context_run() for %pI", address);
583                 return HTTP_INTERNAL_SERVER_ERROR;
584             }
585
586             serf_debug__closed_conn(baton->bkt_alloc);
587         } while (baton->keep_reading);
588
589         return baton->rstatus;
590     }
591 }
592
593 static int serf_handler(request_rec *r)
594 {
595     serf_config_t *conf = ap_get_module_config(r->per_dir_config,
596                                                &serf_module);
597
598     if (conf->on == 0) {
599         return DECLINED;
600     }
601
602     return drive_serf(r, conf);
603 }
604
605 static int is_true(const char *w)
606 {
607     if (strcasecmp(w, "on") == 0 || strcmp(w, "1") == 0 ||
608         strcasecmp(w, "true") == 0)
609     {
610         return 1;
611     }
612
613     return 0;
614 }
615 static const char *add_pass(cmd_parms *cmd, void *vconf,
616                             int argc, char *const argv[])
617 {
618     int i;
619     apr_status_t rv;
620     serf_config_t *conf = (serf_config_t *) vconf;
621
622     if (argc < 1) {
623         return "SerfPass must have at least a URI.";
624     }
625
626     rv = apr_uri_parse(cmd->pool, argv[0], &conf->url);
627
628     if (rv != APR_SUCCESS) {
629         return "Unable to parse SerfPass url.";
630     }
631
632     if (!conf->url.scheme) {
633         return "Need scheme part in url.";
634     }
635
636     /* XXXX: These are bugs in apr_uri_parse. Fixme. */
637     if (!conf->url.port) {
638         conf->url.port = apr_uri_port_of_scheme(conf->url.scheme);
639     }
640
641     if (!conf->url.path) {
642         conf->url.path = "/";
643     }
644
645     for (i = 1; i < argc; i++) {
646         const char *p = argv[i];
647         const char *x = ap_strchr_c(p, '=');
648
649         if (x) {
650             if (strncmp(p, "preservehost", x-p) == 0) {
651                 conf->preservehost = is_true(x+1);
652             }
653         }
654     }
655
656     conf->on = 1;
657
658     return NULL;
659 }
660
661 /* SerfCluster <name> <provider> <key=value_params_to_provider> ... */
662
663 static const char *add_cluster(cmd_parms *cmd, void *d,
664                                int argc, char *const argv[])
665 {
666     const char *rv;
667     ap_serf_cluster_provider_t *backend;
668     int i;
669     serf_cluster_t *cluster = NULL;
670     serf_server_config_t *ctx =
671         (serf_server_config_t *)ap_get_module_config(cmd->server->module_config,
672                                                      &serf_module);
673
674     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
675
676     if (err != NULL) {
677         return err;
678     }
679
680     if (argc < 2) {
681         return "SerfCluster must have at least a name and provider.";
682     }
683
684     cluster = apr_palloc(cmd->pool, sizeof(serf_cluster_t));
685     cluster->name = apr_pstrdup(cmd->pool, argv[0]);
686     cluster->provider = apr_pstrdup(cmd->pool, argv[1]);
687     cluster->params = apr_table_make(cmd->pool, 6);
688
689     backend = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
690
691     if (backend == NULL) {
692         return apr_psprintf(cmd->pool, "SerfCluster: unable to find "
693                             "provider '%s'", cluster->provider);
694     }
695
696     for (i = 2; i < argc; i++) {
697         const char *p = argv[i];
698         const char *x = ap_strchr_c(p, '=');
699
700         if (x && strlen(p) > 1) {
701             apr_table_addn(cluster->params,
702                            apr_pstrndup(cmd->pool, p, x-p),
703                            x+1);
704         }
705         else {
706             apr_table_addn(cluster->params,
707                            apr_pstrdup(cmd->pool, p),
708                            "");
709         }
710     }
711
712     if (backend->check_config == NULL) {
713         return apr_psprintf(cmd->pool, "SerfCluster: Provider '%s' failed to "
714                              "provider a configuration checker",
715                             cluster->provider);
716     }
717
718     rv = backend->check_config(backend->baton, cmd, cluster->params);
719
720     if (rv) {
721         return rv;
722     }
723
724     apr_hash_set(ctx->clusters, cluster->name, APR_HASH_KEY_STRING, cluster);
725
726     return NULL;
727 }
728
729 static void *create_dir_config(apr_pool_t *p, char *dummy)
730 {
731     serf_config_t *new = (serf_config_t *) apr_pcalloc(p, sizeof(serf_config_t));
732     new->on = 0;
733     new->preservehost = 1;
734     return new;
735 }
736
737 static void *create_server_config(apr_pool_t *p, server_rec *s)
738 {
739     serf_server_config_t *ctx =
740         (serf_server_config_t *) apr_pcalloc(p, sizeof(serf_server_config_t));
741
742     ctx->clusters = apr_hash_make(p);
743
744     return ctx;
745 }
746
747 static void * merge_server_config(apr_pool_t *p, void *basev, void *overridesv)
748 {
749     serf_server_config_t *ctx = apr_pcalloc(p, sizeof(serf_server_config_t));
750     serf_server_config_t *base = (serf_server_config_t *) basev;
751     serf_server_config_t *overrides = (serf_server_config_t *) overridesv;
752
753     ctx->clusters = apr_hash_overlay(p, base->clusters, overrides->clusters);
754     return ctx;
755 }
756
757 static const command_rec serf_cmds[] =
758 {
759     AP_INIT_TAKE_ARGV("SerfCluster", add_cluster, NULL, RSRC_CONF,
760                       "Configure a cluster backend"),
761     AP_INIT_TAKE_ARGV("SerfPass", add_pass, NULL, OR_INDEXES,
762                       "URL to reverse proxy to"),
763     {NULL}
764 };
765
766 typedef struct hb_table_baton_t {
767     apr_pool_t *p;
768     const char *msg;
769 } hb_table_baton_t;
770
771 static int hb_table_check(void *rec, const char *key, const char *value)
772 {
773     hb_table_baton_t *b = (hb_table_baton_t*)rec;
774     if (strcmp(key, "path") != 0) {
775         b->msg = apr_psprintf(b->p,
776                               "SerfCluster Heartbeat Invalid parameter '%s'",
777                               key);
778         return 1;
779     }
780
781     return 0;
782 }
783
784 static const char* hb_config_check(void *baton,
785                                    cmd_parms *cmd,
786                                    apr_table_t *params)
787 {
788     hb_table_baton_t b;
789
790     if (apr_is_empty_table(params)) {
791         return "SerfCluster Heartbeat requires a path to the heartbeat information.";
792     }
793
794     b.p = cmd->pool;
795     b.msg = NULL;
796
797     apr_table_do(hb_table_check, &b, params, NULL);
798
799     if (b.msg) {
800         return b.msg;
801     }
802
803     return NULL;
804 }
805
806 typedef struct hb_server_t {
807     const char *ip;
808     int busy;
809     int ready;
810     int seen;
811     unsigned int port;
812 } hb_server_t;
813
814 static void
815 argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
816 {
817     char *key;
818     char *value;
819     char *strtok_state;
820
821     key = apr_strtok(str, "&", &strtok_state);
822     while (key) {
823         value = strchr(key, '=');
824         if (value) {
825             *value = '\0';      /* Split the string in two */
826             value++;            /* Skip passed the = */
827         }
828         else {
829             value = "1";
830         }
831         ap_unescape_url(key);
832         ap_unescape_url(value);
833         apr_table_set(parms, key, value);
834         key = apr_strtok(NULL, "&", &strtok_state);
835     }
836 }
837
838 static apr_status_t read_heartbeats(const char *path,
839                                     apr_array_header_t *servers,
840                                     apr_pool_t *pool)
841 {
842     apr_finfo_t fi;
843     apr_status_t rv;
844     apr_file_t *fp;
845
846     if (!path) {
847         return APR_SUCCESS;
848     }
849
850     rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
851                        APR_OS_DEFAULT, pool);
852
853     if (rv) {
854         return rv;
855     }
856
857     rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
858
859     if (rv) {
860         return rv;
861     }
862
863     {
864         char *t;
865         int lineno = 0;
866         apr_table_t *hbt = apr_table_make(pool, 10);
867         char buf[4096];
868
869         while (apr_file_gets(buf, sizeof(buf), fp) == APR_SUCCESS) {
870             hb_server_t *server;
871             const char *ip;
872             lineno++;
873
874             /* comment */
875             if (buf[0] == '#') {
876                 continue;
877             }
878
879
880             /* line format: <IP> <query_string>\n */
881             t = strchr(buf, ' ');
882             if (!t) {
883                 continue;
884             }
885
886             ip = apr_pstrndup(pool, buf, t - buf);
887             t++;
888             server = apr_pcalloc(pool, sizeof(hb_server_t));
889             server->ip = ip;
890             server->port = 80;
891             server->seen = -1;
892             apr_table_clear(hbt);
893
894             argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
895
896             if (apr_table_get(hbt, "busy")) {
897                 server->busy = atoi(apr_table_get(hbt, "busy"));
898             }
899
900             if (apr_table_get(hbt, "ready")) {
901                 server->ready = atoi(apr_table_get(hbt, "ready"));
902             }
903
904             if (apr_table_get(hbt, "lastseen")) {
905                 server->seen = atoi(apr_table_get(hbt, "lastseen"));
906             }
907
908             if (apr_table_get(hbt, "port")) {
909                 server->port = atoi(apr_table_get(hbt, "port"));
910             }
911
912             if (server->busy == 0 && server->ready != 0) {
913                 /* Server has zero threads active, but lots of them ready,
914                  * it likely just started up, so lets /4 the number ready,
915                  * to prevent us from completely flooding it with all new
916                  * requests.
917                  */
918                 server->ready = server->ready / 4;
919             }
920
921             APR_ARRAY_PUSH(servers, hb_server_t *) = server;
922         }
923     }
924
925     return APR_SUCCESS;
926 }
927
928 static int hb_server_sort(const void *a_, const void *b_)
929 {
930     hb_server_t *a = (hb_server_t*)a_;
931     hb_server_t *b = (hb_server_t*)b_;
932     if (a->ready == b->ready) {
933         return 0;
934     }
935     else if (a->ready > b->ready) {
936         return -1;
937     }
938     else {
939         return 1;
940     }
941 }
942
943 static int hb_list_servers(void *baton,
944                            request_rec *r,
945                            apr_table_t *params,
946                            apr_array_header_t **out_servers)
947 {
948     int i;
949     hb_server_t *hbs;
950     apr_status_t rv;
951     apr_pool_t *tpool;
952     apr_array_header_t *tmpservers;
953     apr_array_header_t *servers;
954     const char *path = apr_table_get(params, "path");
955
956     apr_pool_create(&tpool, r->pool);
957
958     path = ap_server_root_relative(tpool, path);
959
960     tmpservers = apr_array_make(tpool, 32, sizeof(hb_server_t *));
961     rv = read_heartbeats(path, tmpservers, tpool);
962
963     if (rv) {
964         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01134)
965                       "SerfCluster: Heartbeat unable to read '%s'", path);
966         apr_pool_destroy(tpool);
967         return HTTP_INTERNAL_SERVER_ERROR;
968     }
969
970     qsort(tmpservers->elts, tmpservers->nelts, sizeof(hb_server_t *),
971           hb_server_sort);
972
973     servers = apr_array_make(r->pool, tmpservers->nelts, sizeof(ap_serf_server_t *));
974     for (i = 0;
975          i < tmpservers->nelts;
976          i++)
977     {
978         ap_serf_server_t *x;
979
980         hbs = APR_ARRAY_IDX(tmpservers, i, hb_server_t *);
981         if (hbs->ready > 0) {
982             x = apr_palloc(r->pool, sizeof(ap_serf_server_t));
983             x->ip = apr_pstrdup(r->pool, hbs->ip);
984             x->port = hbs->port;
985             APR_ARRAY_PUSH(servers, ap_serf_server_t *) = x;
986         }
987     }
988
989     *out_servers = servers;
990     apr_pool_destroy(tpool);
991     return OK;
992 }
993
994 static const ap_serf_cluster_provider_t builtin_heartbeat =
995 {
996     "heartbeat",
997     NULL,
998     &hb_config_check,
999     &hb_list_servers,
1000     NULL,
1001     NULL
1002 };
1003
1004 static int static_table_check(void *rec, const char *key, const char *value)
1005 {
1006     hb_table_baton_t *b = (hb_table_baton_t*)rec;
1007     if (strcmp(key, "hosts") != 0 &&
1008         strcmp(key, "order") != 0) {
1009         b->msg = apr_psprintf(b->p,
1010                               "SerfCluster Static Invalid parameter '%s'",
1011                               key);
1012         return 1;
1013     }
1014
1015     return 0;
1016 }
1017
1018 static const char* static_config_check(void *baton,
1019                                    cmd_parms *cmd,
1020                                    apr_table_t *params)
1021 {
1022     hb_table_baton_t b;
1023
1024     if (apr_is_empty_table(params)) {
1025         return "SerfCluster Static requires at least a host list.";
1026     }
1027
1028     b.p = cmd->pool;
1029     b.msg = NULL;
1030
1031     apr_table_do(static_table_check, &b, params, NULL);
1032
1033     if (b.msg) {
1034         return b.msg;
1035     }
1036
1037     if (apr_table_get(params, "hosts") == NULL) {
1038         return "SerfCluster Static requires at least a hosts parameter";
1039     }
1040     return NULL;
1041 }
1042
1043 static int static_list_servers(void *baton,
1044                                request_rec *r,
1045                                apr_table_t *params,
1046                                apr_array_header_t **out_servers)
1047 {
1048     apr_status_t rv;
1049     char *ip;
1050     char *strtok_state;
1051     apr_array_header_t *servers;
1052     const char *hosts = apr_table_get(params, "hosts");
1053     const char *order = apr_table_get(params, "order");
1054
1055     servers = apr_array_make(r->pool, 10, sizeof(ap_serf_server_t *));
1056
1057     ip = apr_strtok(apr_pstrdup(r->pool, hosts), ",", &strtok_state);
1058     while (ip) {
1059         char *host_str;
1060         char *scope_id;
1061         apr_port_t port = 0;
1062
1063         rv = apr_parse_addr_port(&host_str, &scope_id, &port, ip, r->pool);
1064         if (!rv) {
1065             ap_serf_server_t *s = apr_palloc(r->pool, sizeof(ap_serf_server_t));
1066             s->ip = host_str;
1067             s->port = port ? port : 80;
1068             APR_ARRAY_PUSH(servers, ap_serf_server_t *) = s;
1069         }
1070         ip = apr_strtok(NULL, ",", &strtok_state);
1071     }
1072
1073     if (strcmp(order, "random") == 0) {
1074         /* TODO: support order=random */
1075     }
1076
1077     *out_servers = servers;
1078
1079     return OK;
1080 }
1081
1082 static const ap_serf_cluster_provider_t builtin_static =
1083 {
1084     "static",
1085     NULL,
1086     &static_config_check,
1087     &static_list_servers,
1088     NULL,
1089     NULL
1090 };
1091
1092 static int serf_post_config(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *s)
1093 {
1094     apr_status_t rv;
1095     rv = ap_mpm_query(AP_MPMQ_HAS_SERF, &mpm_supprts_serf);
1096
1097     if (rv != APR_SUCCESS) {
1098         mpm_supprts_serf = 0;
1099     }
1100
1101     return OK;
1102 }
1103
1104 static void register_hooks(apr_pool_t *p)
1105 {
1106     ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER,
1107                          "heartbeat", "0", &builtin_heartbeat);
1108
1109     ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER,
1110                          "static", "0", &builtin_static);
1111
1112     ap_hook_post_config(serf_post_config, NULL, NULL, APR_HOOK_MIDDLE);
1113
1114     ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST);
1115 }
1116
1117 AP_DECLARE_MODULE(serf) =
1118 {
1119     STANDARD20_MODULE_STUFF,
1120     create_dir_config,
1121     NULL,
1122     create_server_config,
1123     merge_server_config,
1124     serf_cmds,
1125     register_hooks
1126 };