From: Darold Gilles Date: Tue, 12 Feb 2013 18:14:42 +0000 (+0100) Subject: Add progress bar logger for multiprocess by forking a dedicated process and using... X-Git-Tag: v3.2~41 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=41a4a87138a5c791d8e9176a6cde82af684161da;p=pgbadger Add progress bar logger for multiprocess by forking a dedicated process and using pipe. Also fix some bugs in using binary format that duplicate query/error samples per process. --- diff --git a/pgbadger b/pgbadger index b9361d8..9e3ca5c 100755 --- a/pgbadger +++ b/pgbadger @@ -38,8 +38,10 @@ use POSIX qw(locale_h sys_wait_h _exit); setlocale(LC_NUMERIC, ''); setlocale(LC_ALL, 'C'); use File::Temp qw/ :seekable tempfile /; +use IO::Handle; use IO::Pipe; + $VERSION = '2.3'; $SIG{'CHLD'} = 'DEFAULT'; @@ -80,6 +82,7 @@ sub wait_child 1 while wait != -1; $SIG{INT} = \&wait_child; $SIG{TERM} = \&wait_child; + _exit(0); } $SIG{INT} = \&wait_child; $SIG{TERM} = \&wait_child; @@ -573,7 +576,6 @@ my @BRACKETS = ('(', ')'); map {$_ = quotemeta($_)} @BRACKETS; # Where statistics are stored -my %STATS = (); my $first_log_timestamp = ''; my $last_log_timestamp = ''; my $first_log_date = ''; @@ -616,11 +618,14 @@ if ($last_parsed && -e $last_parsed) { } # Main loop reading log files +my $global_totalsize = 0; my @given_log_files = ( @log_files ); # log files must be erase when loading stats from binary format @log_files = () if $format eq 'binary'; +my $pipe; + # Start parsing all given files using multiprocess if ( ($queue_size > 1) || ($job_per_file > 1) ) { @@ -632,11 +637,21 @@ if ( ($queue_size > 1) || ($job_per_file > 1) ) { $parallel_process = $job_per_file; } # Store total size of the log files - my $global_totalsize = 0; foreach my $logfile ( @given_log_files ) { $global_totalsize += &get_log_file($logfile); } + # Open a pipe for interprocess communication + my $reader = new IO::Handle; + my $writer = new IO::Handle; + $pipe = IO::Pipe->new($reader, $writer); + $writer->autoflush(1); + + # Fork the logger process + spawn sub { + &multiprocess_progressbar($global_totalsize); + }; + # Parse each log file following the multiprocess mode chosen (-j or -J) my @tempfiles = (); foreach my $logfile ( @given_log_files ) { @@ -670,6 +685,7 @@ if ( ($queue_size > 1) || ($job_per_file > 1) ) { # Load all data gathered by all the differents processes if (!$abort) { + &init_stats_vars(); foreach my $f (@tempfiles) { my $fht = new IO::File; $fht->open("< $f->[1]") or die "FATAL: can't open file $f->[1], $!\n"; @@ -702,7 +718,7 @@ my $t1 = Benchmark->new; my $td = timediff($t1, $t0); &logmsg('DEBUG', "the log statistics gathering took:" . timestr($td)); -&logmsg('DEBUG', "Ok, generating $extension report..."); +&logmsg('LOG', "Ok, generating $extension report..."); # Open filehandle my $fh = undef; @@ -877,6 +893,73 @@ This supposes that your log file and HTML report are also rotated every week. exit 0; } +sub init_stats_vars +{ + # Empty where statistics are stored + $first_log_timestamp = ''; + $last_log_timestamp = ''; + $first_log_date = ''; + $last_log_date = ''; + %overall_stat = (); + @top_slowest = (); + %normalyzed_info = (); + %error_info = (); + %logs_type = (); + %per_hour_info = (); + %per_minute_info = (); + %lock_info = (); + %tempfile_info = (); + %connection_info = (); + %database_info = (); + %application_info = (); + %session_info = (); + %conn_received = (); + %checkpoint_info = (); + %autovacuum_info = (); + %autoanalyze_info = (); + @graph_values = (); + %cur_info = (); + $nlines = 0; + %last_line = (); + %saved_last_line = (); + %tsung_session = (); +} + +#### +# Main function called per each parser process +#### +sub multiprocess_progressbar +{ + my $totalsize = shift; + + &logmsg('DEBUG', "Starting progressbar writer process"); + + $0 = 'pgbadger logger'; + + my $timeout = 3; + my $cursize = 0; + my $nqueries = 0; + my $nerrors = 0; + #local $SIG{ALRM} = sub { warn "ALARM: no more data from pipe, stopping progress writer.\n"; exit(0); }; + #alarm $timeout; + $pipe->reader(); + while (my $r = <$pipe>) { + chomp($r); + my @infos = split(/\s+/, $r); + $cursize += $infos[0]; + $nqueries += $infos[1]; + $nerrors += $infos[2]; + $cursize = $totalsize if ($cursize > $totalsize); + print STDERR &progress_bar($cursize, $totalsize, 25, '=', $nqueries, $nerrors); + last if ($cursize >= $totalsize); + #alarm $timeout; + } + #alarm 0; + print STDERR "\n"; + + exit 0; +} + #### # Main function called per each parser process #### @@ -884,6 +967,14 @@ sub process_file { my ($logfile, $tmpoutfile, $start_offset, $stop_offset) = @_; + my $old_queries_count = 0; + my $old_errors_count = 0; + my $current_offset = $start_offset || 0; + + $0 = 'pgbadger parser'; + + &init_stats_vars(); + &logmsg('DEBUG', "Starting to parse log file: $logfile"); my $terminate = 0; @@ -892,6 +983,8 @@ sub process_file my $curdate = localtime(time); + $pipe->writer() if (defined $pipe); + # Syslog does not have year information, so take care of year overlapping my ($gsec, $gmin, $ghour, $gmday, $gmon, $gyear, $gwday, $gyday, $gisdst) = localtime(time); $gyear += 1900; @@ -908,6 +1001,7 @@ sub process_file &logmsg('DEBUG', "Starting reading file $logfile..."); if ($format eq 'csv') { + require Text::CSV_XS; my $csv = Text::CSV_XS->new({binary => 1, eol => $/}); @@ -921,11 +1015,20 @@ sub process_file # Set progress statistics $cursize += length(join(',', @$row)); $nlines++; - if ($progress && (($nlines % $NUMPROGRESS) == 0)) { - if ($totalsize) { - print STDERR &progress_bar($cursize, $totalsize, 25, '='); - } else { - print STDERR "."; + if (!$tmpoutfile) { + if ($progress && (($nlines % $NUMPROGRESS) == 0)) { + if ($totalsize) { + print STDERR &progress_bar($cursize, $totalsize, 25, '='); + } else { + print STDERR "."; + } + } + } else { + if ($progress && (($nlines % $NUMPROGRESS) == 0)) { + $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); + $old_queries_count = $overall_stat{'queries_number'}; + $old_errors_count = $overall_stat{'errors_number'}; + $cursize = 0; } } @@ -1014,7 +1117,6 @@ sub process_file my $goon = 0; if ($start_offset) { $lfile->seek($start_offset, 0); - $cursize += $start_offset; } while (my $line = <$lfile>) { @@ -1022,20 +1124,30 @@ sub process_file last if ($terminate); $cursize += length($line); + $current_offset += length($line); chomp($line); $line =~ s/\r//; $nlines++; next if (!$line); - if ($progress && (($nlines % $NUMPROGRESS) == 0)) { - if ($totalsize) { - if ($stop_offset > 0) { - print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '='); + if (!$tmpoutfile) { + if ($progress && (($nlines % $NUMPROGRESS) == 0)) { + if ($totalsize) { + if ($stop_offset > 0) { + print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '='); + } else { + print STDERR &progress_bar($cursize, $totalsize, 25, '='); + } } else { - print STDERR &progress_bar($cursize, $totalsize, 25, '='); + print STDERR "."; } - } else { - print STDERR "."; + } + } else { + if ($progress && (($nlines % $NUMPROGRESS) == 0)) { + $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); + $old_queries_count = $overall_stat{'queries_number'}; + $old_errors_count = $overall_stat{'errors_number'}; + $cursize = 0; } } @@ -1227,7 +1339,7 @@ sub process_file # unknown format &logmsg('DEBUG', "Unknown line format: $line"); } - last if (($stop_offset > 0) && ($cursize > $stop_offset)); + last if (($stop_offset > 0) && ($current_offset > $stop_offset)); } } close $lfile; @@ -1244,14 +1356,21 @@ sub process_file %cur_info = (); if ($progress) { - if ($totalsize) { - if (($stop_offset > 0) && ($format ne 'csv')) { - print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=', $logfile); - } else { - print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile); + if (!$tmpoutfile) { + if ($totalsize) { + if (($stop_offset > 0) && ($format ne 'csv')) { + print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '='); + } else { + print STDERR &progress_bar($cursize, $totalsize, 25, '='); + } + print STDERR "\n"; } + } else { + $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); + $old_queries_count = $overall_stat{'queries_number'}; + $old_errors_count = $overall_stat{'errors_number'}; + $cursize = 0; } - print STDERR "\n"; } if ($tmpoutfile) { &dump_as_binary($tmpoutfile); @@ -3987,7 +4106,9 @@ sub load_stats ### log_files ### - @log_files = (@log_files, @_log_files); + foreach my $f (@_log_files) { + push(@log_files, $f) if (!grep(m#^$f$#, @_log_files)); + } ### per_hour_info ### @@ -4022,19 +4143,21 @@ sub load_stats foreach my $q (keys %_error_info) { $error_info{$q}{count} += $_error_info{$q}{count}; - push(@{$error_info{$q}{date}}, @{$_error_info{$q}{date}}); - push(@{$error_info{$q}{detail}}, @{$_error_info{$q}{detail}}); - push(@{$error_info{$q}{context}}, @{$_error_info{$q}{context}}); - push(@{$error_info{$q}{statement}}, @{$_error_info{$q}{statement}}); - push(@{$error_info{$q}{hint}}, @{$_error_info{$q}{hint}}); - push(@{$error_info{$q}{error}}, @{$_error_info{$q}{error}}); - push(@{$error_info{$q}{db}}, @{$_error_info{$q}{db}}); - foreach my $day (keys %{ $_error_info{$q}{chronos} }) { - foreach my $hour (keys %{$_error_info{$q}{chronos}{$day}}) { - $error_info{$q}{chronos}{$day}{$hour}{count} += $_error_info{$q}{chronos}{$day}{$hour}{count}; + # Keep only the wanted sample number + if (!exists $error_info{$q}{date} || ($#{$error_info{$q}{date}} < $sample)) { + push(@{$error_info{$q}{date}}, @{$_error_info{$q}{date}}); + push(@{$error_info{$q}{detail}}, @{$_error_info{$q}{detail}}); + push(@{$error_info{$q}{context}}, @{$_error_info{$q}{context}}); + push(@{$error_info{$q}{statement}}, @{$_error_info{$q}{statement}}); + push(@{$error_info{$q}{hint}}, @{$_error_info{$q}{hint}}); + push(@{$error_info{$q}{error}}, @{$_error_info{$q}{error}}); + push(@{$error_info{$q}{db}}, @{$_error_info{$q}{db}}); + foreach my $day (keys %{ $_error_info{$q}{chronos} }) { + foreach my $hour (keys %{$_error_info{$q}{chronos}{$day}}) { + $error_info{$q}{chronos}{$day}{$hour}{count} += $_error_info{$q}{chronos}{$day}{$hour}{count}; + } } } - } ### per_minute_info ### @@ -4106,6 +4229,15 @@ sub load_stats $normalyzed_info{$stmt}{samples}{$dt} = $_normalyzed_info{$stmt}{samples}{$dt}; } + # Keep only the top N samples + my $i = 1; + foreach my $k (sort {$b <=> $a} keys %{$normalyzed_info{$stmt}{samples}}) { + if ($i > $sample) { + delete $normalyzed_info{$stmt}{samples}{$k}; + } + $i++; + } + $normalyzed_info{$stmt}{count} += $_normalyzed_info{$stmt}{count}; foreach my $day (keys %{$_normalyzed_info{$stmt}{chronos}} ) { @@ -5115,7 +5247,7 @@ sub autodetect_duration sub progress_bar { - my ($got, $total, $width, $char) = @_; + my ($got, $total, $width, $char, $queries, $errors) = @_; $width ||= 25; $char ||= '='; my $num_width = length $total; @@ -5123,19 +5255,19 @@ sub progress_bar sprintf( "[%-${width}s] Parsed %${num_width}s bytes of %s (%.2f%%), queries: %d\r", $char x (($width - 1) * $got / $total) . '>', - $got, $total, 100 * $got / +$total, $tsung_queries + $got, $total, 100 * $got / +$total, ($queries || $tsung_queries) ); } elsif($format eq 'binary') { my $file = $_[-1]; sprintf( "Loaded %d queries and %d events from binary file %s...\r", - $overall_stat{'queries_number'}, $overall_stat{'errors_number'}, $file + ($queries || $overall_stat{'queries_number'}), ($errors || $overall_stat{'errors_number'}), $file ); } else { sprintf( "[%-${width}s] Parsed %${num_width}s bytes of %s (%.2f%%), queries: %d, events: %d\r", $char x (($width - 1) * $got / $total) . '>', - $got, $total, 100 * $got / +$total, $overall_stat{'queries_number'}, $overall_stat{'errors_number'} + $got, $total, 100 * $got / +$total, ($queries || $overall_stat{'queries_number'}), ($errors || $overall_stat{'errors_number'}) ); } }