2 * Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
17 * Originally written @ BBC by Graham Leggett
18 * Copyright 2009-2011 British Broadcasting Corporation
24 #include "apr_buckets.h"
25 #include "apr_file_io.h"
26 #include "apr_file_info.h"
29 #include "apr_portable.h"
30 #include "apr_getopt.h"
31 #include "apr_signal.h"
32 #include "apr_strings.h"
41 #include "ap_release.h"
43 #define DEFAULT_MAXLINES 0
44 #define DEFAULT_MAXSIZE 0
45 #define DEFAULT_AGE 0 * 1000 * 1000
46 #define DEFAULT_PREFIX 0
47 #define DEFAULT_NONBLOCK 0
49 typedef struct file_rec
55 const char *directory;
56 apr_bucket_alloc_t *alloc;
57 apr_bucket_brigade *bb;
58 apr_hash_t *request_uuids;
59 apr_hash_t *response_uuids;
62 apr_size_t skipped_bytes;
63 apr_size_t dropped_fragments;
68 typedef struct uuid_rec
79 typedef struct filter_rec
86 typedef struct header_rec
91 char uuid[APR_UUID_FORMATTED_LENGTH + 1];
96 static const apr_getopt_option_t
104 " --file, -f <name>\t\t\tFile to read the firehose from.\n\t\t\t\t\tDefaults to stdin." },
109 " --output-directory, -o <name>\tDirectory to write demuxed connections\n\t\t\t\t\tto." },
114 " --uuid, -u <uuid>\t\t\tThe UUID of the connection to\n\t\t\t\t\tdemultiplex. Can be specified more\n\t\t\t\t\tthan once." },
115 /* { "output-host", 'h', 1,
116 " --output-host, -h <hostname>\tHostname to write demuxed connections to." },*/
121 " --speed, -s <factor>\tSpeed up or slow down demuxing\n\t\t\t\tby the given factor." },*/
122 { "help", 258, 0, " --help, -h\t\t\t\tThis help text." },
124 " --version\t\t\t\tDisplay the version of the program." },
127 #define HELP_HEADER "Usage : %s [options] [prefix1 [prefix2 ...]]\n\n" \
128 "Firehose demultiplexes the given stream of multiplexed connections, and\n" \
129 "writes each connection to a file, or to a socket as appropriate.\n" \
131 "When writing to files, each connection is placed into a dedicated file\n" \
132 "named after the UUID of the connection within the stream. Separate files\n" \
133 "will be created if requests and responses are found in the stream.\n" \
135 "If an optional prefix is specified as a parameter, connections that start\n" \
136 "with the given prefix will be included. The prefix needs to fit completely\n" \
137 "within the first fragment for a successful match to occur.\n" \
139 /* "When writing to a socket, new connections\n"
140 * "are opened for each connection in the stream, allowing it to be possible to\n"
141 * "'replay' traffic recorded by one server to other server.\n"
144 #define HELP_FOOTER ""
149 static void version(const char * const progname)
151 printf("%s (%s)\n", progname, AP_SERVER_VERSION);
155 * Help the long suffering end user.
157 static void help(const char *argv, const char * header, const char *footer,
158 const apr_getopt_option_t opts[])
163 printf(header, argv);
166 while (opts[i].name) {
167 printf("%s\n", opts[i].description);
172 printf("%s\n", footer);
177 * Cleanup a uuid record. Removes the record from the uuid hashtable in files.
179 static apr_status_t cleanup_uuid_rec(void *dummy)
181 uuid_rec *rec = (uuid_rec *) dummy;
183 if (rec->direction == '>') {
184 apr_hash_set(rec->file->response_uuids, rec->uuid, APR_HASH_KEY_STRING,
187 if (rec->direction == '<') {
188 apr_hash_set(rec->file->request_uuids, rec->uuid, APR_HASH_KEY_STRING,
196 * Create a uuid record, register a cleanup for it's destruction.
198 static apr_status_t make_uuid_rec(file_rec *file, header_rec *header,
203 apr_pool_create(&pool, file->pool);
205 rec = apr_pcalloc(pool, sizeof(uuid_rec));
208 rec->uuid = apr_pstrdup(pool, header->uuid);
210 rec->last = header->timestamp;
211 rec->direction = header->direction;
213 if (header->direction == '>') {
214 apr_hash_set(file->response_uuids, rec->uuid, APR_HASH_KEY_STRING, rec);
216 if (header->direction == '<') {
217 apr_hash_set(file->request_uuids, rec->uuid, APR_HASH_KEY_STRING, rec);
220 apr_pool_cleanup_register(pool, rec, cleanup_uuid_rec, cleanup_uuid_rec);
227 * Process the end of the fragment body.
229 * This function renames the completed stream to it's final name.
231 static apr_status_t finalise_body(file_rec *file, header_rec *header)
234 char *nfrom, *nto, *from, *to;
237 apr_pool_create(&pool, file->pool);
239 to = apr_pstrcat(pool, header->uuid, header->direction == '>' ? ".response"
241 from = apr_pstrcat(pool, to, ".part", NULL);
243 status = apr_filepath_merge(&nfrom, file->directory, from,
244 APR_FILEPATH_SECUREROOT, pool);
245 if (APR_SUCCESS == status) {
246 status = apr_filepath_merge(&nto, file->directory, to,
247 APR_FILEPATH_SECUREROOT, pool);
248 if (APR_SUCCESS == status) {
249 if (APR_SUCCESS == (status = apr_file_mtime_set(nfrom, file->end, pool))) {
250 if (APR_SUCCESS != (status = apr_file_rename(nfrom, nto, pool))) {
253 "Could not rename file '%s' to '%s' for fragment write: %pm\n",
254 nfrom, nto, &status);
260 "Could not set mtime on file '%s' to '%" APR_TIME_T_FMT "' for fragment write: %pm\n",
261 nfrom, file->end, &status);
265 apr_file_printf(file->file_err,
266 "Could not merge directory '%s' with file '%s': %pm\n",
267 file->directory, to, &status);
271 apr_file_printf(file->file_err,
272 "Could not merge directory '%s' with file '%s': %pm\n",
273 file->directory, from, &status);
276 apr_pool_destroy(pool);
282 * Check if the fragment matches on of the prefixes.
284 static int check_prefix(file_rec *file, header_rec *header, const char *str,
287 apr_hash_index_t *hi;
292 apr_pool_create(&pool, file->pool);
294 for (hi = apr_hash_first(pool, file->filters); hi; hi = apr_hash_next(hi)) {
296 apr_hash_this(hi, NULL, NULL, &val);
297 filter = (filter_rec *) val;
299 if (len > filter->len && !strncmp(filter->prefix, str, filter->len)) {
306 apr_pool_destroy(pool);
312 * Process part of the fragment body, given the header parameters.
314 * Currently, we append it to a file named after the UUID of the connection.
316 * The file is opened on demand and closed when done, so that we are
317 * guaranteed never to hit a file handle limit (within reason).
319 static apr_status_t process_body(file_rec *file, header_rec *header,
320 const char *str, apr_size_t len)
328 file->start = header->timestamp;
330 file->end = header->timestamp;
332 apr_pool_create(&pool, file->pool);
335 = apr_pstrcat(pool, header->uuid,
336 header->direction == '>' ? ".response.part"
337 : ".request.part", NULL);
339 status = apr_filepath_merge(&native, file->directory, name,
340 APR_FILEPATH_SECUREROOT, pool);
341 if (APR_SUCCESS == status) {
342 if (APR_SUCCESS == (status = apr_file_open(&handle, native, APR_WRITE
343 | APR_CREATE | APR_APPEND, APR_OS_DEFAULT, pool))) {
344 if (APR_SUCCESS != (status = apr_file_write_full(handle, str, len,
346 apr_file_printf(file->file_err,
347 "Could not write fragment body to file '%s': %pm\n",
352 apr_file_printf(file->file_err,
353 "Could not open file '%s' for fragment write: %pm\n",
358 apr_file_printf(file->file_err,
359 "Could not merge directory '%s' with file '%s': %pm\n",
360 file->directory, name, &status);
363 apr_pool_destroy(pool);
369 * Parse a chunk extension, detect overflow.
370 * There are two error cases:
371 * 1) If the conversion would require too many bits, a -1 is returned.
372 * 2) If the conversion used the correct number of bits, but an overflow
373 * caused only the sign bit to flip, then that negative number is
375 * In general, any negative number can be considered an overflow error.
377 static apr_status_t read_hex(const char **buf, apr_uint64_t *val)
379 const char *b = *buf;
380 apr_uint64_t chunksize = 0;
381 apr_size_t chunkbits = sizeof(apr_uint64_t) * 8;
383 if (!apr_isxdigit(*b)) {
386 /* Skip leading zeros */
391 while (apr_isxdigit(*b) && (chunkbits > 0)) {
394 if (*b >= '0' && *b <= '9') {
397 else if (*b >= 'A' && *b <= 'F') {
398 xvalue = *b - 'A' + 0xa;
400 else if (*b >= 'a' && *b <= 'f') {
401 xvalue = *b - 'a' + 0xa;
404 chunksize = (chunksize << 4) | xvalue;
409 if (apr_isxdigit(*b) && (chunkbits <= 0)) {
420 * Parse what might be a fragment header line.
422 * If the parse doesn't match for any reason, an error is returned, otherwise
425 * The header structure will be filled with the header values as parsed.
427 static apr_status_t process_header(file_rec *file, header_rec *header,
428 const char *str, apr_size_t len)
434 const char *end = str + len;
436 if (APR_SUCCESS != (status = read_hex(&str, &val))) {
441 if (!apr_isspace(*(str++))) {
445 if (APR_SUCCESS != (status = read_hex(&str, &val))) {
448 header->timestamp = val;
450 if (!apr_isspace(*(str++))) {
454 if (*str != '<' && *str != '>') {
457 header->direction = *str;
460 if (!apr_isspace(*(str++))) {
464 for (i = 0; str[i] && i < APR_UUID_FORMATTED_LENGTH; i++) {
465 header->uuid[i] = str[i];
468 if (apr_uuid_parse(&raw, header->uuid)) {
473 if (!apr_isspace(*(str++))) {
477 if (APR_SUCCESS != (status = read_hex(&str, &val))) {
482 if ((*(str++) != '\r')) {
485 if ((*(str++) != '\n')) {
496 * Suck on the file/pipe, and demux any fragments on the incoming stream.
498 * If EOF is detected, this function returns.
500 static apr_status_t demux(file_rec *file)
503 apr_status_t status = APR_SUCCESS;
505 apr_bucket_brigade *bb, *obb;
509 bb = apr_brigade_create(file->pool, file->alloc);
510 obb = apr_brigade_create(file->pool, file->alloc);
511 b = apr_bucket_pipe_create(file->file_in, file->alloc);
513 APR_BRIGADE_INSERT_HEAD(bb, b);
517 /* when the pipe is closed, the pipe disappears from the brigade */
518 if (APR_BRIGADE_EMPTY(bb)) {
522 status = apr_brigade_split_line(obb, bb, APR_BLOCK_READ,
525 if (APR_SUCCESS == status || APR_EOF == status) {
526 char str[HUGE_STRING_LEN];
527 len = HUGE_STRING_LEN;
529 apr_brigade_flatten(obb, str, &len);
531 apr_brigade_cleanup(obb);
533 if (len == HUGE_STRING_LEN) {
534 file->skipped_bytes += len;
538 if (len == 2 && str[0] == '\r' && str[1] == '\n') {
542 file->skipped_bytes += len;
546 status = process_header(file, &header, str, len);
547 if (APR_SUCCESS != status) {
548 file->skipped_bytes += len;
555 if (header.direction == '>') {
556 header.rec = apr_hash_get(file->response_uuids,
557 header.uuid, APR_HASH_KEY_STRING);
559 if (header.direction == '<') {
560 header.rec = apr_hash_get(file->request_uuids,
561 header.uuid, APR_HASH_KEY_STRING);
564 /* does the count match what is expected? */
565 if (header.count != header.rec->count) {
566 file->dropped_fragments++;
571 /* must we ignore unknown uuids? */
576 /* is the counter not what we expect? */
577 else if (header.count != 0) {
578 file->skipped_bytes += len;
582 /* otherwise, make a new uuid */
584 make_uuid_rec(file, &header, &header.rec);
589 if (APR_SUCCESS != (status = apr_brigade_partition(bb,
593 "Could not read fragment body from input file: %pm\n", &status);
596 while ((b = APR_BRIGADE_FIRST(bb)) && e != b) {
597 apr_bucket_read(b, &buf, &len, APR_READ_BLOCK);
598 if (!ignore && !header.count && !check_prefix(file,
599 &header, buf, len)) {
603 status = process_body(file, &header, buf, len);
604 header.rec->offset += len;
606 if (ignore || APR_SUCCESS != status) {
607 apr_bucket_delete(b);
608 file->skipped_bytes += len;
611 apr_bucket_delete(b);
620 /* an empty header means end-of-connection */
624 status = process_body(file, &header, "", 0);
626 status = finalise_body(file, &header);
628 apr_pool_destroy(header.rec->pool);
637 apr_file_printf(file->file_err,
638 "Could not read fragment header from input file: %pm\n", &status);
648 * Start the application.
650 int main(int argc, const char * const argv[])
660 /* lets get APR off the ground, and make sure it terminates cleanly */
661 if (APR_SUCCESS != (status = apr_app_initialize(&argc, &argv, NULL))) {
664 atexit(apr_terminate);
666 if (APR_SUCCESS != (status = apr_pool_create(&pool, NULL))) {
671 apr_signal_block(SIGPIPE);
674 file = apr_pcalloc(pool, sizeof(file_rec));
675 apr_file_open_stderr(&file->file_err, pool);
676 apr_file_open_stdin(&file->file_in, pool);
677 apr_file_open_stdout(&file->file_out, pool);
680 file->alloc = apr_bucket_alloc_create(pool);
681 file->bb = apr_brigade_create(pool, file->alloc);
682 file->request_uuids = apr_hash_make(pool);
683 file->response_uuids = apr_hash_make(pool);
684 file->filters = apr_hash_make(pool);
686 apr_getopt_init(&opt, pool, argc, argv);
687 while ((status = apr_getopt_long(opt, cmdline_opts, &optch, &optarg))
692 status = apr_file_open(&file->file_in, optarg, APR_FOPEN_READ,
693 APR_OS_DEFAULT, pool);
694 if (status != APR_SUCCESS) {
695 apr_file_printf(file->file_err,
696 "Could not open file '%s' for read: %pm\n", optarg, &status);
703 status = apr_stat(&finfo, optarg, APR_FINFO_TYPE, pool);
704 if (status != APR_SUCCESS) {
705 apr_file_printf(file->file_err,
706 "Directory '%s' could not be found: %pm\n", optarg, &status);
709 if (finfo.filetype != APR_DIR) {
710 apr_file_printf(file->file_err,
711 "Path '%s' isn't a directory\n", optarg);
714 file->directory = optarg;
720 apr_pool_create(&pchild, pool);
721 rec = apr_pcalloc(pchild, sizeof(uuid_rec));
724 apr_hash_set(file->request_uuids, optarg, APR_HASH_KEY_STRING, rec);
725 apr_hash_set(file->response_uuids, optarg, APR_HASH_KEY_STRING, rec);
734 help(argv[0], HELP_HEADER, HELP_FOOTER, cmdline_opts);
741 if (APR_SUCCESS != status && APR_EOF != status) {
745 /* read filters from the command line */
746 while (opt->ind < argc) {
749 apr_pool_create(&pchild, pool);
750 filter = apr_pcalloc(pchild, sizeof(filter_rec));
751 filter->pool = pchild;
752 filter->prefix = opt->argv[opt->ind];
753 filter->len = strlen(opt->argv[opt->ind]);
754 apr_hash_set(file->filters, opt->argv[opt->ind], APR_HASH_KEY_STRING,
759 status = demux(file);
761 /* warn people if any non blocking writes failed */
762 if (file->skipped_bytes || file->dropped_fragments) {
765 "Warning: %" APR_SIZE_T_FMT " bytes skipped, %" APR_SIZE_T_FMT " fragments dropped.\n",
766 file->skipped_bytes, file->dropped_fragments);
769 if (APR_SUCCESS != status) {