From: Darold Gilles Date: Wed, 20 Feb 2013 20:37:49 +0000 (+0100) Subject: Fix freeze of pgbadger in multiprocess mode with -e or -d filtering options X-Git-Tag: v3.2~31 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=71bf39e1a1784c7b945216345abccf74406c07ae;p=pgbadger Fix freeze of pgbadger in multiprocess mode with -e or -d filtering options --- diff --git a/pgbadger b/pgbadger index 9968b8e..4b517ba 100755 --- a/pgbadger +++ b/pgbadger @@ -42,12 +42,18 @@ setlocale(LC_ALL, 'C'); use File::Temp qw/ :seekable tempfile /; use IO::Handle; use IO::Pipe; - +use Time::HiRes qw/usleep/; $VERSION = '3.0'; $SIG{'CHLD'} = 'DEFAULT'; +my $TMP_DIR = '/tmp'; +my %RUNNING_PIDS = (); +my @tempfiles = (); +my $parent_pid = $$; +my $interrupt = 0; + #### # method used to fork as many child as wanted ## @@ -65,6 +71,7 @@ sub spawn print STDERR "Error: cannot fork: $!\n"; return; } elsif ($pid) { + $RUNNING_PIDS{$pid} = $pid; return; # the parent } # the child -- go spawn @@ -74,21 +81,28 @@ sub spawn exit &$coderef(); } +# Informa the parent that it should stop iterate on parsing other files +sub stop_parsing +{ + $interrupt = 1; +} + # With multiprocess we need to wait all childs -my $abort = 0; sub wait_child { my $sig = shift; - $abort = 1; print STDERR "Received terminating signal ($sig).\n"; 1 while wait != -1; $SIG{INT} = \&wait_child; $SIG{TERM} = \&wait_child; + foreach my $f (@tempfiles) { + unlink("$f->[1]") if (-e "$f->[1]"); + } _exit(0); } $SIG{INT} = \&wait_child; $SIG{TERM} = \&wait_child; - +$SIG{USR2} = \&stop_parsing; $| = 1; @@ -622,17 +636,28 @@ if ( ($queue_size > 1) || ($job_per_file > 1) ) { }; # Parse each log file following the multiprocess mode chosen (-j or -J) - my @tempfiles = (); foreach my $logfile ( @given_log_files ) { - while ($child_count >= $parallel_process) { $child_count-- if (wait); } - last if ($abort); + while ($child_count >= $parallel_process) { + my $kid = waitpid(-1, WNOHANG); + if ($kid > 0) { + $child_count--; + delete $RUNNING_PIDS{$kid}; + } + usleep(500000); + } if ($queue_size > 1) { # Create multiple process to parse one log file by chunks of data my @chunks = &split_logfile($logfile); for (my $i = 0; $i < $#chunks; $i++) { - while ($child_count >= $parallel_process) { $child_count-- if (wait); } - last if ($abort); - push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 ) ]); + while ($child_count >= $parallel_process) { + my $kid = waitpid(-1, WNOHANG); + if ($kid > 0) { + $child_count--; + delete $RUNNING_PIDS{$kid}; + } + usleep(500000); + } + push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', DIR => $TMP_DIR, UNLINK => 1 ) ]); spawn sub { &process_file($logfile, $tempfiles[-1]->[0], $chunks[$i], $chunks[$i+1]); }; @@ -641,38 +666,46 @@ if ( ($queue_size > 1) || ($job_per_file > 1) ) { } else { # Create on process per log files to parse - push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 ) ]); + push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', DIR => $TMP_DIR, UNLINK => 1 ) ]); spawn sub { &process_file($logfile, $tempfiles[-1]->[0]); }; $child_count++; } + last if ($interrupt); } - # Wait for all child dies - 1 while wait != -1; + # Wait for all child dies less the logger + while (scalar keys %RUNNING_PIDS > 1) { + my $kid = waitpid(-1, WNOHANG); + if ($kid > 0) { + delete $RUNNING_PIDS{$kid}; + } + usleep(500000); + } + # Terminate the process logger + foreach my $k (keys %RUNNING_PIDS) { + kill(10, $k); + %RUNNING_PIDS = (); + } # Load all data gathered by all the differents processes - if (!$abort) { - &init_stats_vars(); - foreach my $f (@tempfiles) { - my $fht = new IO::File; - $fht->open("< $f->[1]") or die "FATAL: can't open file $f->[1], $!\n"; - &load_stats($fht); - $fht->close(); - } + &init_stats_vars(); + foreach my $f (@tempfiles) { + next if (!-e "$f->[1]" || -z "$f->[1]"); + my $fht = new IO::File; + $fht->open("< $f->[1]") or die "FATAL: can't open file $f->[1], $!\n"; + &load_stats($fht); + $fht->close(); } } else { # Multiprocessing disabled, parse log files one by one foreach my $logfile ( @given_log_files ) { - &process_file($logfile); + last if (&process_file($logfile)); } } -# A terminate signal has been received. -exit 1 if ($abort); - # Save last line parsed if ($last_parsed && scalar keys %last_line) { if (open(OUT, ">$last_parsed")) { @@ -729,7 +762,7 @@ if ($extension ne 'tsung') { my $t2 = Benchmark->new; $td = timediff($t2, $t1); -&logmsg('DEBUG', "the report generating took:" . timestr($td)); +&logmsg('DEBUG', "building reports took:" . timestr($td)); $td = timediff($t2, $t0); &logmsg('DEBUG', "the total execution time took:" . timestr($td)); @@ -837,6 +870,9 @@ Examples: # Log line prefix with syslog log output perl pgbadger --prefix 'user=%u,db=%d,client=%h,appname=%a' \ /pglog/postgresql-2012-08-21* + # Use my 8 CPUs to parse my 10GB file faster, really faster + perl pgbadger -j 8 /pglog/postgresql-9.1-main.log + Generate Tsung sessions XML file with select queries only: @@ -897,11 +933,15 @@ sub multiprocess_progressbar $0 = 'pgbadger logger'; + # Terminate the process when we doesn't read the complete file but must exit + local $SIG{USR1} = sub { + print STDERR "\n"; + exit 0; + }; my $timeout = 3; my $cursize = 0; my $nqueries = 0; my $nerrors = 0; - my $exit = 0; $pipe->reader(); while (my $r = <$pipe>) { chomp($r); @@ -909,10 +949,9 @@ sub multiprocess_progressbar $cursize += $infos[0]; $nqueries += $infos[1]; $nerrors += $infos[2]; - $exit = $infos[3] if ($#infos == 3); $cursize = $totalsize if ($cursize > $totalsize); print STDERR &progress_bar($cursize, $totalsize, 25, '=', $nqueries, $nerrors); - last if ($exit || ($cursize >= $totalsize)); + last if ($cursize >= $totalsize); } print STDERR "\n"; @@ -929,10 +968,11 @@ sub process_file my $old_queries_count = 0; my $old_errors_count = 0; my $current_offset = $start_offset || 0; + my $getout = 0; $0 = 'pgbadger parser'; - &init_stats_vars(); + &init_stats_vars() if ($tmpoutfile); &logmsg('DEBUG', "Starting to parse log file: $logfile"); @@ -959,7 +999,6 @@ sub process_file &logmsg('DEBUG', "Starting reading file $logfile..."); - my $getout = 0; if ($format eq 'csv') { require Text::CSV_XS; @@ -1004,7 +1043,7 @@ sub process_file next if ($from && ($from gt $prefix_vars{'t_timestamp'})); if ($to && ($to lt $prefix_vars{'t_timestamp'})) { if ($tmpoutfile) { - $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . " exit\n"); + $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); $old_queries_count = $overall_stat{'queries_number'}; $old_errors_count = $overall_stat{'errors_number'}; $cursize = 0; @@ -1132,7 +1171,7 @@ sub process_file next if ($from && ($from gt $prefix_vars{'t_timestamp'})); if ($to && ($to lt $prefix_vars{'t_timestamp'})) { if ($tmpoutfile) { - $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . " exit\n"); + $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); $old_queries_count = $overall_stat{'queries_number'}; $old_errors_count = $overall_stat{'errors_number'}; $cursize = 0; @@ -1220,7 +1259,7 @@ sub process_file next if ($from && ($from gt $prefix_vars{'t_timestamp'})); if ($to && ($to lt $prefix_vars{'t_timestamp'})) { if ($tmpoutfile) { - $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . " exit\n"); + $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); $old_queries_count = $overall_stat{'queries_number'}; $old_errors_count = $overall_stat{'errors_number'}; $cursize = 0; @@ -1299,19 +1338,19 @@ sub process_file if (!$tmpoutfile) { if ($totalsize) { if (($stop_offset > 0) && ($format ne 'csv')) { - print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '='); - } else { + print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=',$overall_stat{'queries_number'},$overall_stat{'errors_number'}); + } elsif ($extension eq 'tsung') { print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile); + } else { + print STDERR &progress_bar($cursize, $totalsize, 25, '=', $overall_stat{'queries_number'},$overall_stat{'errors_number'}); } print STDERR "\n"; } } else { $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n"); - $old_queries_count = $overall_stat{'queries_number'}; - $old_errors_count = $overall_stat{'errors_number'}; - $cursize = 0; } } + %cur_info = (); if ($tmpoutfile) { @@ -1319,7 +1358,12 @@ sub process_file $tmpoutfile->close(); } - return 0; + # Inform the parent that it should stop parsing other files + if ($getout) { + kill(12, $parent_pid); + } + + return $getout; } # Store the current timestamp of the log line @@ -3925,6 +3969,7 @@ sub show_error_as_html sub load_stats { + my $fd = shift; my %stats = %{ fd_retrieve($fd) }; my %_overall_stat = %{$stats{overall_stat}}; @@ -3948,9 +3993,12 @@ sub load_stats my %_autovacuum_info = %{$stats{autovacuum_info}}; my %_autoanalyze_info = %{$stats{autoanalyze_info}}; + return if (!$_overall_stat{queries_number}); + ### overall_stat ### $overall_stat{queries_number} += $_overall_stat{queries_number}; + $overall_stat{'first_log_ts'} = $_overall_stat{'first_log_ts'} if not $overall_stat{'first_log_ts'} or $overall_stat{'first_log_ts'} gt $_overall_stat{'first_log_ts'}; @@ -3960,11 +4008,11 @@ sub load_stats or $overall_stat{'last_log_ts'} lt $_overall_stat{'last_log_ts'}; $overall_stat{first_query_ts} = $_overall_stat{first_query_ts} - if not defined $overall_stat{first_query_ts} + if not $overall_stat{first_query_ts} or $overall_stat{first_query_ts} gt $_overall_stat{first_query_ts}; $overall_stat{last_query_ts} = $_overall_stat{last_query_ts} - if not defined $overall_stat{last_query_ts} + if not $overall_stat{last_query_ts} or $overall_stat{last_query_ts} lt $_overall_stat{last_query_ts}; $overall_stat{errors_number} += $_overall_stat{errors_number};