]> granicus.if.org Git - apache/blob - support/firehose.c
httpdunit: merge to trunk from feature branch
[apache] / support / firehose.c
1 /**
2  * Licensed under the Apache License, Version 2.0 (the "License");
3  * you may not use this file except in compliance with the License.
4  * You may obtain a copy of the License at
5  *
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  *
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  */
15
16 /*
17  * Originally written @ BBC by Graham Leggett
18  * Copyright 2009-2011 British Broadcasting Corporation
19  *
20  */
21
22 #include "apr.h"
23 #include "apr_lib.h"
24 #include "apr_buckets.h"
25 #include "apr_file_io.h"
26 #include "apr_file_info.h"
27 #include "apr_hash.h"
28 #include "apr_poll.h"
29 #include "apr_portable.h"
30 #include "apr_getopt.h"
31 #include "apr_signal.h"
32 #include "apr_strings.h"
33 #include "apr_uuid.h"
34 #if APR_HAVE_STDLIB_H
35 #include <stdlib.h>
36 #endif
37 #if APR_HAVE_STRING_H
38 #include <string.h>
39 #endif
40
41 #include "ap_release.h"
42
43 #define DEFAULT_MAXLINES 0
44 #define DEFAULT_MAXSIZE 0
45 #define DEFAULT_AGE 0 * 1000 * 1000
46 #define DEFAULT_PREFIX 0
47 #define DEFAULT_NONBLOCK 0
48
49 typedef struct file_rec
50 {
51     apr_pool_t *pool;
52     apr_file_t *file_err;
53     apr_file_t *file_in;
54     apr_file_t *file_out;
55     const char *directory;
56     apr_bucket_alloc_t *alloc;
57     apr_bucket_brigade *bb;
58     apr_hash_t *request_uuids;
59     apr_hash_t *response_uuids;
60     apr_hash_t *filters;
61     int limit;
62     apr_size_t skipped_bytes;
63     apr_size_t dropped_fragments;
64     apr_time_t start;
65     apr_time_t end;
66 } file_rec;
67
68 typedef struct uuid_rec
69 {
70     apr_pool_t *pool;
71     const char *uuid;
72     file_rec *file;
73     apr_uint64_t count;
74     apr_time_t last;
75     apr_size_t offset;
76     int direction;
77 } uuid_rec;
78
79 typedef struct filter_rec
80 {
81     apr_pool_t *pool;
82     const char *prefix;
83     apr_size_t len;
84 } filter_rec;
85
86 typedef struct header_rec
87 {
88     apr_size_t len;
89     apr_time_t timestamp;
90     int direction;
91     char uuid[APR_UUID_FORMATTED_LENGTH + 1];
92     apr_uint64_t count;
93     uuid_rec *rec;
94 } header_rec;
95
96 static const apr_getopt_option_t
97         cmdline_opts[] =
98         {
99                 /* commands */
100                 {
101                         "file",
102                         'f',
103                         1,
104                         "   --file, -f <name>\t\t\tFile to read the firehose from.\n\t\t\t\t\tDefaults to stdin." },
105                 {
106                         "output-directory",
107                         'd',
108                         1,
109                         "   --output-directory, -o <name>\tDirectory to write demuxed connections\n\t\t\t\t\tto." },
110                 {
111                         "uuid",
112                         'u',
113                         1,
114                         "   --uuid, -u <uuid>\t\t\tThe UUID of the connection to\n\t\t\t\t\tdemultiplex. Can be specified more\n\t\t\t\t\tthan once." },
115                 /*                              { "output-host", 'h', 1,
116                  "   --output-host, -h <hostname>\tHostname to write demuxed connections to." },*/
117                 /*                              {
118                  "speed",
119                  's',
120                  1,
121                  "   --speed, -s <factor>\tSpeed up or slow down demuxing\n\t\t\t\tby the given factor." },*/
122                 { "help", 258, 0, "   --help, -h\t\t\t\tThis help text." },
123                 { "version", 257, 0,
124                         "   --version\t\t\t\tDisplay the version of the program." },
125                 { NULL } };
126
127 #define HELP_HEADER "Usage : %s [options] [prefix1 [prefix2 ...]]\n\n" \
128                     "Firehose demultiplexes the given stream of multiplexed connections, and\n" \
129                     "writes each connection to a file, or to a socket as appropriate.\n" \
130                     "\n" \
131                     "When writing to files, each connection is placed into a dedicated file\n" \
132                     "named after the UUID of the connection within the stream. Separate files\n" \
133                     "will be created if requests and responses are found in the stream.\n" \
134                     "\n" \
135                     "If an optional prefix is specified as a parameter, connections that start\n" \
136                     "with the given prefix will be included. The prefix needs to fit completely\n" \
137                     "within the first fragment for a successful match to occur.\n" \
138                     "\n"
139 /*                    "When writing to a socket, new connections\n"
140  *                    "are opened for each connection in the stream, allowing it to be possible to\n"
141  *                    "'replay' traffic recorded by one server to other server.\n"
142  *                    "\n\n"
143  */
144 #define HELP_FOOTER ""
145
146 /**
147  * Who are we again?
148  */
149 static void version(const char * const progname)
150 {
151     printf("%s (%s)\n", progname, AP_SERVER_VERSION);
152 }
153
154 /**
155  * Help the long suffering end user.
156  */
157 static void help(const char *argv, const char * header, const char *footer,
158         const apr_getopt_option_t opts[])
159 {
160     int i = 0;
161
162     if (header) {
163         printf(header, argv);
164     }
165
166     while (opts[i].name) {
167         printf("%s\n", opts[i].description);
168         i++;
169     }
170
171     if (footer) {
172         printf("%s\n", footer);
173     }
174 }
175
176 /**
177  * Cleanup a uuid record. Removes the record from the uuid hashtable in files.
178  */
179 static apr_status_t cleanup_uuid_rec(void *dummy)
180 {
181     uuid_rec *rec = (uuid_rec *) dummy;
182
183     if (rec->direction == '>') {
184         apr_hash_set(rec->file->response_uuids, rec->uuid, APR_HASH_KEY_STRING,
185                 NULL);
186     }
187     if (rec->direction == '<') {
188         apr_hash_set(rec->file->request_uuids, rec->uuid, APR_HASH_KEY_STRING,
189                 NULL);
190     }
191
192     return APR_SUCCESS;
193 }
194
195 /**
196  * Create a uuid record, register a cleanup for it's destruction.
197  */
198 static apr_status_t make_uuid_rec(file_rec *file, header_rec *header,
199         uuid_rec **ptr)
200 {
201     apr_pool_t *pool;
202     uuid_rec *rec;
203     apr_pool_create(&pool, file->pool);
204
205     rec = apr_pcalloc(pool, sizeof(uuid_rec));
206     rec->pool = pool;
207     rec->file = file;
208     rec->uuid = apr_pstrdup(pool, header->uuid);
209     rec->count = 0;
210     rec->last = header->timestamp;
211     rec->direction = header->direction;
212
213     if (header->direction == '>') {
214         apr_hash_set(file->response_uuids, rec->uuid, APR_HASH_KEY_STRING, rec);
215     }
216     if (header->direction == '<') {
217         apr_hash_set(file->request_uuids, rec->uuid, APR_HASH_KEY_STRING, rec);
218     }
219
220     apr_pool_cleanup_register(pool, rec, cleanup_uuid_rec, cleanup_uuid_rec);
221
222     *ptr = rec;
223     return APR_SUCCESS;
224 }
225
226 /**
227  * Process the end of the fragment body.
228  *
229  * This function renames the completed stream to it's final name.
230  */
231 static apr_status_t finalise_body(file_rec *file, header_rec *header)
232 {
233     apr_status_t status;
234     char *nfrom, *nto, *from, *to;
235     apr_pool_t *pool;
236
237     apr_pool_create(&pool, file->pool);
238
239     to = apr_pstrcat(pool, header->uuid, header->direction == '>' ? ".response"
240             : ".request", NULL);
241     from = apr_pstrcat(pool, to, ".part", NULL);
242
243     status = apr_filepath_merge(&nfrom, file->directory, from,
244             APR_FILEPATH_SECUREROOT, pool);
245     if (APR_SUCCESS == status) {
246         status = apr_filepath_merge(&nto, file->directory, to,
247                 APR_FILEPATH_SECUREROOT, pool);
248         if (APR_SUCCESS == status) {
249             if (APR_SUCCESS == (status = apr_file_mtime_set(nfrom, file->end, pool))) {
250                 if (APR_SUCCESS != (status = apr_file_rename(nfrom, nto, pool))) {
251                     apr_file_printf(
252                             file->file_err,
253                             "Could not rename file '%s' to '%s' for fragment write: %pm\n",
254                             nfrom, nto, &status);
255                 }
256             }
257             else {
258                 apr_file_printf(
259                         file->file_err,
260                         "Could not set mtime on file '%s' to '%" APR_TIME_T_FMT "' for fragment write: %pm\n",
261                         nfrom, file->end, &status);
262             }
263         }
264         else {
265             apr_file_printf(file->file_err,
266                     "Could not merge directory '%s' with file '%s': %pm\n",
267                     file->directory, to, &status);
268         }
269     }
270     else {
271         apr_file_printf(file->file_err,
272                 "Could not merge directory '%s' with file '%s': %pm\n",
273                 file->directory, from, &status);
274     }
275
276     apr_pool_destroy(pool);
277
278     return status;
279 }
280
281 /**
282  * Check if the fragment matches on of the prefixes.
283  */
284 static int check_prefix(file_rec *file, header_rec *header, const char *str,
285         apr_size_t len)
286 {
287     apr_hash_index_t *hi;
288     void *val;
289     apr_pool_t *pool;
290     int match = -1;
291
292     apr_pool_create(&pool, file->pool);
293
294     for (hi = apr_hash_first(pool, file->filters); hi; hi = apr_hash_next(hi)) {
295         filter_rec *filter;
296         apr_hash_this(hi, NULL, NULL, &val);
297         filter = (filter_rec *) val;
298
299         if (len > filter->len && !strncmp(filter->prefix, str, filter->len)) {
300             match = 1;
301             break;
302         }
303         match = 0;
304     }
305
306     apr_pool_destroy(pool);
307
308     return match;
309 }
310
311 /**
312  * Process part of the fragment body, given the header parameters.
313  *
314  * Currently, we append it to a file named after the UUID of the connection.
315  *
316  * The file is opened on demand and closed when done, so that we are
317  * guaranteed never to hit a file handle limit (within reason).
318  */
319 static apr_status_t process_body(file_rec *file, header_rec *header,
320         const char *str, apr_size_t len)
321 {
322     apr_status_t status;
323     char *native, *name;
324     apr_pool_t *pool;
325     apr_file_t *handle;
326
327     if (!file->start) {
328         file->start = header->timestamp;
329     }
330     file->end = header->timestamp;
331
332     apr_pool_create(&pool, file->pool);
333
334     name
335             = apr_pstrcat(pool, header->uuid,
336                     header->direction == '>' ? ".response.part"
337                             : ".request.part", NULL);
338
339     status = apr_filepath_merge(&native, file->directory, name,
340             APR_FILEPATH_SECUREROOT, pool);
341     if (APR_SUCCESS == status) {
342         if (APR_SUCCESS == (status = apr_file_open(&handle, native, APR_WRITE
343                 | APR_CREATE | APR_APPEND, APR_OS_DEFAULT, pool))) {
344             if (APR_SUCCESS != (status = apr_file_write_full(handle, str, len,
345                     NULL))) {
346                 apr_file_printf(file->file_err,
347                         "Could not write fragment body to file '%s': %pm\n",
348                         native, &status);
349             }
350         }
351         else {
352             apr_file_printf(file->file_err,
353                     "Could not open file '%s' for fragment write: %pm\n",
354                     native, &status);
355         }
356     }
357     else {
358         apr_file_printf(file->file_err,
359                 "Could not merge directory '%s' with file '%s': %pm\n",
360                 file->directory, name, &status);
361     }
362
363     apr_pool_destroy(pool);
364
365     return status;
366 }
367
368 /**
369  * Parse a chunk extension, detect overflow.
370  * There are two error cases:
371  *  1) If the conversion would require too many bits, a -1 is returned.
372  *  2) If the conversion used the correct number of bits, but an overflow
373  *     caused only the sign bit to flip, then that negative number is
374  *     returned.
375  * In general, any negative number can be considered an overflow error.
376  */
377 static apr_status_t read_hex(const char **buf, apr_uint64_t *val)
378 {
379     const char *b = *buf;
380     apr_uint64_t chunksize = 0;
381     apr_size_t chunkbits = sizeof(apr_uint64_t) * 8;
382
383     if (!apr_isxdigit(*b)) {
384         return APR_EGENERAL;
385     }
386     /* Skip leading zeros */
387     while (*b == '0') {
388         ++b;
389     }
390
391     while (apr_isxdigit(*b) && (chunkbits > 0)) {
392         int xvalue = 0;
393
394         if (*b >= '0' && *b <= '9') {
395             xvalue = *b - '0';
396         }
397         else if (*b >= 'A' && *b <= 'F') {
398             xvalue = *b - 'A' + 0xa;
399         }
400         else if (*b >= 'a' && *b <= 'f') {
401             xvalue = *b - 'a' + 0xa;
402         }
403
404         chunksize = (chunksize << 4) | xvalue;
405         chunkbits -= 4;
406         ++b;
407     }
408     *buf = b;
409     if (apr_isxdigit(*b) && (chunkbits <= 0)) {
410         /* overflow */
411         return APR_EGENERAL;
412     }
413
414     *val = chunksize;
415
416     return APR_SUCCESS;
417 }
418
419 /**
420  * Parse what might be a fragment header line.
421  *
422  * If the parse doesn't match for any reason, an error is returned, otherwise
423  * APR_SUCCESS.
424  *
425  * The header structure will be filled with the header values as parsed.
426  */
427 static apr_status_t process_header(file_rec *file, header_rec *header,
428         const char *str, apr_size_t len)
429 {
430     apr_uint64_t val;
431     apr_status_t status;
432     int i;
433     apr_uuid_t raw;
434     const char *end = str + len;
435
436     if (APR_SUCCESS != (status = read_hex(&str, &val))) {
437         return status;
438     }
439     header->len = val;
440
441     if (!apr_isspace(*(str++))) {
442         return APR_EGENERAL;
443     }
444
445     if (APR_SUCCESS != (status = read_hex(&str, &val))) {
446         return status;
447     }
448     header->timestamp = val;
449
450     if (!apr_isspace(*(str++))) {
451         return APR_EGENERAL;
452     }
453
454     if (*str != '<' && *str != '>') {
455         return APR_EGENERAL;
456     }
457     header->direction = *str;
458     str++;
459
460     if (!apr_isspace(*(str++))) {
461         return APR_EGENERAL;
462     }
463
464     for (i = 0; str[i] && i < APR_UUID_FORMATTED_LENGTH; i++) {
465         header->uuid[i] = str[i];
466     }
467     header->uuid[i] = 0;
468     if (apr_uuid_parse(&raw, header->uuid)) {
469         return APR_EGENERAL;
470     }
471     str += i;
472
473     if (!apr_isspace(*(str++))) {
474         return APR_EGENERAL;
475     }
476
477     if (APR_SUCCESS != (status = read_hex(&str, &val))) {
478         return status;
479     }
480     header->count = val;
481
482     if ((*(str++) != '\r')) {
483         return APR_EGENERAL;
484     }
485     if ((*(str++) != '\n')) {
486         return APR_EGENERAL;
487     }
488     if (str != end) {
489         return APR_EGENERAL;
490     }
491
492     return APR_SUCCESS;
493 }
494
495 /**
496  * Suck on the file/pipe, and demux any fragments on the incoming stream.
497  *
498  * If EOF is detected, this function returns.
499  */
500 static apr_status_t demux(file_rec *file)
501 {
502     apr_size_t len = 0;
503     apr_status_t status = APR_SUCCESS;
504     apr_bucket *b, *e;
505     apr_bucket_brigade *bb, *obb;
506     int footer = 0;
507     const char *buf;
508
509     bb = apr_brigade_create(file->pool, file->alloc);
510     obb = apr_brigade_create(file->pool, file->alloc);
511     b = apr_bucket_pipe_create(file->file_in, file->alloc);
512
513     APR_BRIGADE_INSERT_HEAD(bb, b);
514
515     do {
516
517         /* when the pipe is closed, the pipe disappears from the brigade */
518         if (APR_BRIGADE_EMPTY(bb)) {
519             break;
520         }
521
522         status = apr_brigade_split_line(obb, bb, APR_BLOCK_READ,
523                 HUGE_STRING_LEN);
524
525         if (APR_SUCCESS == status || APR_EOF == status) {
526             char str[HUGE_STRING_LEN];
527             len = HUGE_STRING_LEN;
528
529             apr_brigade_flatten(obb, str, &len);
530
531             apr_brigade_cleanup(obb);
532
533             if (len == HUGE_STRING_LEN) {
534                 file->skipped_bytes += len;
535                 continue;
536             }
537             else if (footer) {
538                 if (len == 2 && str[0] == '\r' && str[1] == '\n') {
539                     footer = 0;
540                     continue;
541                 }
542                 file->skipped_bytes += len;
543             }
544             else if (len > 0) {
545                 header_rec header;
546                 status = process_header(file, &header, str, len);
547                 if (APR_SUCCESS != status) {
548                     file->skipped_bytes += len;
549                     continue;
550                 }
551                 else {
552                     int ignore = 0;
553
554                     header.rec = NULL;
555                     if (header.direction == '>') {
556                         header.rec = apr_hash_get(file->response_uuids,
557                                 header.uuid, APR_HASH_KEY_STRING);
558                     }
559                     if (header.direction == '<') {
560                         header.rec = apr_hash_get(file->request_uuids,
561                                 header.uuid, APR_HASH_KEY_STRING);
562                     }
563                     if (header.rec) {
564                         /* does the count match what is expected? */
565                         if (header.count != header.rec->count) {
566                             file->dropped_fragments++;
567                             ignore = 1;
568                         }
569                     }
570                     else {
571                         /* must we ignore unknown uuids? */
572                         if (file->limit) {
573                             ignore = 1;
574                         }
575
576                         /* is the counter not what we expect? */
577                         else if (header.count != 0) {
578                             file->skipped_bytes += len;
579                             ignore = 1;
580                         }
581
582                         /* otherwise, make a new uuid */
583                         else {
584                             make_uuid_rec(file, &header, &header.rec);
585                         }
586                     }
587
588                     if (header.len) {
589                         if (APR_SUCCESS != (status = apr_brigade_partition(bb,
590                                 header.len, &e))) {
591                             apr_file_printf(
592                                     file->file_err,
593                                     "Could not read fragment body from input file: %pm\n", &status);
594                             break;
595                         }
596                         while ((b = APR_BRIGADE_FIRST(bb)) && e != b) {
597                             apr_bucket_read(b, &buf, &len, APR_READ_BLOCK);
598                             if (!ignore && !header.count && !check_prefix(file,
599                                     &header, buf, len)) {
600                                 ignore = 1;
601                             }
602                             if (!ignore) {
603                                 status = process_body(file, &header, buf, len);
604                                 header.rec->offset += len;
605                             }
606                             if (ignore || APR_SUCCESS != status) {
607                                 apr_bucket_delete(b);
608                                 file->skipped_bytes += len;
609                                 continue;
610                             }
611                             apr_bucket_delete(b);
612                         }
613                         if (!ignore) {
614                             header.rec->count++;
615                         }
616                         footer = 1;
617                         continue;
618                     }
619                     else {
620                         /* an empty header means end-of-connection */
621                         if (header.rec) {
622                             if (!ignore) {
623                                 if (!header.count) {
624                                     status = process_body(file, &header, "", 0);
625                                 }
626                                 status = finalise_body(file, &header);
627                             }
628                             apr_pool_destroy(header.rec->pool);
629                         }
630                     }
631
632                 }
633             }
634
635         }
636         else {
637             apr_file_printf(file->file_err,
638                     "Could not read fragment header from input file: %pm\n", &status);
639             break;
640         }
641
642     } while (1);
643
644     return status;
645 }
646
647 /**
648  * Start the application.
649  */
650 int main(int argc, const char * const argv[])
651 {
652     apr_status_t status;
653     apr_pool_t *pool;
654     apr_getopt_t *opt;
655     int optch;
656     const char *optarg;
657
658     file_rec *file;
659
660     /* lets get APR off the ground, and make sure it terminates cleanly */
661     if (APR_SUCCESS != (status = apr_app_initialize(&argc, &argv, NULL))) {
662         return 1;
663     }
664     atexit(apr_terminate);
665
666     if (APR_SUCCESS != (status = apr_pool_create(&pool, NULL))) {
667         return 1;
668     }
669
670 #ifdef SIGPIPE
671     apr_signal_block(SIGPIPE);
672 #endif
673
674     file = apr_pcalloc(pool, sizeof(file_rec));
675     apr_file_open_stderr(&file->file_err, pool);
676     apr_file_open_stdin(&file->file_in, pool);
677     apr_file_open_stdout(&file->file_out, pool);
678
679     file->pool = pool;
680     file->alloc = apr_bucket_alloc_create(pool);
681     file->bb = apr_brigade_create(pool, file->alloc);
682     file->request_uuids = apr_hash_make(pool);
683     file->response_uuids = apr_hash_make(pool);
684     file->filters = apr_hash_make(pool);
685
686     apr_getopt_init(&opt, pool, argc, argv);
687     while ((status = apr_getopt_long(opt, cmdline_opts, &optch, &optarg))
688             == APR_SUCCESS) {
689
690         switch (optch) {
691         case 'f': {
692             status = apr_file_open(&file->file_in, optarg, APR_FOPEN_READ,
693                     APR_OS_DEFAULT, pool);
694             if (status != APR_SUCCESS) {
695                 apr_file_printf(file->file_err,
696                         "Could not open file '%s' for read: %pm\n", optarg, &status);
697                 return 1;
698             }
699             break;
700         }
701         case 'd': {
702             apr_finfo_t finfo;
703             status = apr_stat(&finfo, optarg, APR_FINFO_TYPE, pool);
704             if (status != APR_SUCCESS) {
705                 apr_file_printf(file->file_err,
706                         "Directory '%s' could not be found: %pm\n", optarg, &status);
707                 return 1;
708             }
709             if (finfo.filetype != APR_DIR) {
710                 apr_file_printf(file->file_err,
711                         "Path '%s' isn't a directory\n", optarg);
712                 return 1;
713             }
714             file->directory = optarg;
715             break;
716         }
717         case 'u': {
718             apr_pool_t *pchild;
719             uuid_rec *rec;
720             apr_pool_create(&pchild, pool);
721             rec = apr_pcalloc(pchild, sizeof(uuid_rec));
722             rec->pool = pchild;
723             rec->uuid = optarg;
724             apr_hash_set(file->request_uuids, optarg, APR_HASH_KEY_STRING, rec);
725             apr_hash_set(file->response_uuids, optarg, APR_HASH_KEY_STRING, rec);
726             file->limit++;
727             break;
728         }
729         case 257: {
730             version(argv[0]);
731             return 0;
732         }
733         case 258: {
734             help(argv[0], HELP_HEADER, HELP_FOOTER, cmdline_opts);
735             return 0;
736
737         }
738         }
739
740     }
741     if (APR_SUCCESS != status && APR_EOF != status) {
742         return 1;
743     }
744
745     /* read filters from the command line */
746     while (opt->ind < argc) {
747         apr_pool_t *pchild;
748         filter_rec *filter;
749         apr_pool_create(&pchild, pool);
750         filter = apr_pcalloc(pchild, sizeof(filter_rec));
751         filter->pool = pchild;
752         filter->prefix = opt->argv[opt->ind];
753         filter->len = strlen(opt->argv[opt->ind]);
754         apr_hash_set(file->filters, opt->argv[opt->ind], APR_HASH_KEY_STRING,
755                 filter);
756         opt->ind++;
757     }
758
759     status = demux(file);
760
761     /* warn people if any non blocking writes failed */
762     if (file->skipped_bytes || file->dropped_fragments) {
763         apr_file_printf(
764                 file->file_err,
765                 "Warning: %" APR_SIZE_T_FMT " bytes skipped, %" APR_SIZE_T_FMT " fragments dropped.\n",
766                 file->skipped_bytes, file->dropped_fragments);
767     }
768
769     if (APR_SUCCESS != status) {
770         return 1;
771     }
772
773     return 0;
774 }