]> granicus.if.org Git - pgbadger/commitdiff
Fix freeze of pgbadger in multiprocess mode with -e or -d filtering options
authorDarold Gilles <gilles@darold.net>
Wed, 20 Feb 2013 20:37:49 +0000 (21:37 +0100)
committerDarold Gilles <gilles@darold.net>
Wed, 20 Feb 2013 20:37:49 +0000 (21:37 +0100)
pgbadger

index 9968b8e429f56f64316b5bcea12ef2b7dba308af..4b517bafd9fc5cfb3ba74ec4aea24997d8e5f8f0 100755 (executable)
--- a/pgbadger
+++ b/pgbadger
@@ -42,12 +42,18 @@ setlocale(LC_ALL,     'C');
 use File::Temp qw/ :seekable tempfile /;
 use IO::Handle;
 use IO::Pipe;
-
+use Time::HiRes qw/usleep/;
 
 $VERSION = '3.0';
 
 $SIG{'CHLD'} = 'DEFAULT';
 
+my $TMP_DIR      = '/tmp';
+my %RUNNING_PIDS = ();
+my @tempfiles    = ();
+my $parent_pid   = $$;
+my $interrupt    = 0;
+
 ####
 # method used to fork as many child as wanted
 ##
@@ -65,6 +71,7 @@ sub spawn
                print STDERR "Error: cannot fork: $!\n";
                return;
        } elsif ($pid) {
+               $RUNNING_PIDS{$pid} = $pid;
                return; # the parent
        }
        # the child -- go spawn
@@ -74,21 +81,28 @@ sub spawn
        exit &$coderef();
 }
 
+# Informa the parent that it should stop iterate on parsing other files
+sub stop_parsing
+{
+       $interrupt = 1;
+}
+
 # With multiprocess we need to wait all childs
-my $abort = 0;
 sub wait_child
 {
         my $sig = shift;
-       $abort = 1;
         print STDERR "Received terminating signal ($sig).\n";
         1 while wait != -1;
         $SIG{INT} = \&wait_child;
         $SIG{TERM} = \&wait_child;
+       foreach my $f (@tempfiles) {
+               unlink("$f->[1]") if (-e "$f->[1]");
+       }
        _exit(0);
 }
 $SIG{INT} = \&wait_child;
 $SIG{TERM} = \&wait_child;
-
+$SIG{USR2} = \&stop_parsing;
 
 $| = 1;
 
@@ -622,17 +636,28 @@ if ( ($queue_size > 1) || ($job_per_file > 1) ) {
        };
 
        # Parse each log file following the multiprocess mode chosen (-j or -J)
-       my @tempfiles = ();
        foreach my $logfile ( @given_log_files ) {
-               while ($child_count >= $parallel_process) { $child_count-- if (wait); }
-               last if ($abort);
+               while ($child_count >= $parallel_process) {
+                       my $kid = waitpid(-1, WNOHANG);
+                       if ($kid > 0) {
+                               $child_count--;
+                               delete $RUNNING_PIDS{$kid};
+                       }
+                       usleep(500000);
+               }
                if ($queue_size > 1) {
                        # Create multiple process to parse one log file by chunks of data
                        my @chunks = &split_logfile($logfile);
                        for (my $i = 0; $i < $#chunks; $i++) {
-                               while ($child_count >= $parallel_process) { $child_count-- if (wait); }
-                               last if ($abort);
-                               push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 ) ]);
+                               while ($child_count >= $parallel_process) {
+                                       my $kid = waitpid(-1, WNOHANG);
+                                       if ($kid > 0) {
+                                               $child_count--;
+                                               delete $RUNNING_PIDS{$kid};
+                                       }
+                                       usleep(500000);
+                               }
+                               push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', DIR => $TMP_DIR, UNLINK => 1 ) ]);
                                spawn sub {
                                        &process_file($logfile, $tempfiles[-1]->[0], $chunks[$i], $chunks[$i+1]);
                                };
@@ -641,38 +666,46 @@ if ( ($queue_size > 1) || ($job_per_file > 1) ) {
                } else {
 
                        # Create on process per log files to parse
-                       push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 ) ]);
+                       push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', DIR => $TMP_DIR, UNLINK => 1 ) ]);
                        spawn sub {
                                &process_file($logfile, $tempfiles[-1]->[0]);
                        };
                        $child_count++;
                }
+               last if ($interrupt);
        }
 
-       # Wait for all child dies
-       1 while wait != -1;
+       # Wait for all child dies less the logger
+       while (scalar keys %RUNNING_PIDS > 1) {
+               my $kid = waitpid(-1, WNOHANG);
+               if ($kid > 0) {
+                       delete $RUNNING_PIDS{$kid};
+               }
+               usleep(500000);
+       }
+       # Terminate the process logger
+       foreach my $k (keys %RUNNING_PIDS) {
+               kill(10, $k);
+               %RUNNING_PIDS = ();
+       }
 
        # Load all data gathered by all the differents processes
-       if (!$abort) {
-               &init_stats_vars();
-               foreach my $f (@tempfiles) {
-                       my $fht = new IO::File;
-                       $fht->open("< $f->[1]") or die "FATAL: can't open file $f->[1], $!\n";
-                       &load_stats($fht);
-                       $fht->close();
-               }
+       &init_stats_vars();
+       foreach my $f (@tempfiles) {
+               next if (!-e "$f->[1]" || -z "$f->[1]");
+               my $fht = new IO::File;
+               $fht->open("< $f->[1]") or die "FATAL: can't open file $f->[1], $!\n";
+               &load_stats($fht);
+               $fht->close();
        }
 
 } else {
        # Multiprocessing disabled, parse log files one by one
        foreach my $logfile ( @given_log_files ) {
-               &process_file($logfile);
+               last if (&process_file($logfile));
        }
 }
 
-# A terminate signal has been received.
-exit 1 if ($abort);
-
 # Save last line parsed
 if ($last_parsed && scalar keys %last_line) {
        if (open(OUT, ">$last_parsed")) {
@@ -729,7 +762,7 @@ if ($extension ne 'tsung') {
 
 my $t2 = Benchmark->new;
 $td = timediff($t2, $t1);
-&logmsg('DEBUG', "the report generating took:" . timestr($td));
+&logmsg('DEBUG', "building reports took:" . timestr($td));
 $td = timediff($t2, $t0);
 &logmsg('DEBUG', "the total execution time took:" . timestr($td));
 
@@ -837,6 +870,9 @@ Examples:
        # Log line prefix with syslog log output
        perl pgbadger --prefix 'user=%u,db=%d,client=%h,appname=%a' \
                        /pglog/postgresql-2012-08-21*
+       # Use my 8 CPUs to parse my 10GB file faster, really faster
+       perl pgbadger -j 8 /pglog/postgresql-9.1-main.log
+
 
 Generate Tsung sessions XML file with select queries only:
 
@@ -897,11 +933,15 @@ sub multiprocess_progressbar
 
        $0 = 'pgbadger logger';
 
+       # Terminate the process when we doesn't read the complete file but must exit
+       local $SIG{USR1} = sub {
+               print STDERR "\n";
+               exit 0;
+       };
        my $timeout  = 3;
        my $cursize  = 0;
        my $nqueries = 0;
        my $nerrors  = 0;
-       my $exit     = 0;
        $pipe->reader();
        while (my $r = <$pipe>) {
                chomp($r);
@@ -909,10 +949,9 @@ sub multiprocess_progressbar
                $cursize  += $infos[0];
                $nqueries += $infos[1];
                $nerrors  += $infos[2];
-               $exit     =  $infos[3] if ($#infos == 3);
                $cursize = $totalsize if ($cursize > $totalsize);
                print STDERR &progress_bar($cursize, $totalsize, 25, '=', $nqueries, $nerrors);
-               last if ($exit || ($cursize >= $totalsize));
+               last if ($cursize >= $totalsize);
        }
        print STDERR "\n";
 
@@ -929,10 +968,11 @@ sub process_file
        my $old_queries_count = 0;
        my $old_errors_count  = 0;
        my $current_offset    = $start_offset || 0;
+       my $getout            = 0;
 
        $0 = 'pgbadger parser';
 
-       &init_stats_vars();
+       &init_stats_vars() if ($tmpoutfile);
 
        &logmsg('DEBUG', "Starting to parse log file: $logfile");
 
@@ -959,7 +999,6 @@ sub process_file
 
        &logmsg('DEBUG', "Starting reading file $logfile...");
 
-       my $getout = 0;
        if ($format eq 'csv') {
 
                require Text::CSV_XS;
@@ -1004,7 +1043,7 @@ sub process_file
                        next if ($from && ($from gt $prefix_vars{'t_timestamp'}));
                        if ($to && ($to lt $prefix_vars{'t_timestamp'})) {
                                if ($tmpoutfile) {
-                                       $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . " exit\n");
+                                       $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n");
                                        $old_queries_count = $overall_stat{'queries_number'};
                                        $old_errors_count = $overall_stat{'errors_number'};
                                        $cursize = 0;
@@ -1132,7 +1171,7 @@ sub process_file
                                        next if ($from && ($from gt $prefix_vars{'t_timestamp'}));
                                        if ($to   && ($to lt $prefix_vars{'t_timestamp'})) {
                                                if ($tmpoutfile) {
-                                                       $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . " exit\n");
+                                                       $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n");
                                                        $old_queries_count = $overall_stat{'queries_number'};
                                                        $old_errors_count = $overall_stat{'errors_number'};
                                                        $cursize = 0;
@@ -1220,7 +1259,7 @@ sub process_file
                                        next if ($from && ($from gt $prefix_vars{'t_timestamp'}));
                                        if ($to   && ($to lt $prefix_vars{'t_timestamp'})) {
                                                if ($tmpoutfile) {
-                                                       $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . " exit\n");
+                                                       $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n");
                                                        $old_queries_count = $overall_stat{'queries_number'};
                                                        $old_errors_count = $overall_stat{'errors_number'};
                                                        $cursize = 0;
@@ -1299,19 +1338,19 @@ sub process_file
                if (!$tmpoutfile) {
                        if ($totalsize) {
                                if (($stop_offset > 0) && ($format ne 'csv')) {
-                                       print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=');
-                               } else {
+                                       print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=',$overall_stat{'queries_number'},$overall_stat{'errors_number'});
+                               } elsif ($extension eq 'tsung') {
                                        print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile);
+                               } else {
+                                       print STDERR &progress_bar($cursize, $totalsize, 25, '=', $overall_stat{'queries_number'},$overall_stat{'errors_number'});
                                }
                                print STDERR "\n";
                        }
                } else {
                        $pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n");
-                       $old_queries_count = $overall_stat{'queries_number'};
-                       $old_errors_count = $overall_stat{'errors_number'};
-                       $cursize = 0;
                }
        }
+
        %cur_info = ();
 
        if ($tmpoutfile) {
@@ -1319,7 +1358,12 @@ sub process_file
                $tmpoutfile->close();
        }
 
-       return 0;
+       # Inform the parent that it should stop parsing other files
+       if ($getout) {
+               kill(12, $parent_pid);
+       }
+
+       return $getout;
 }
 
 # Store the current timestamp of the log line
@@ -3925,6 +3969,7 @@ sub show_error_as_html
 
 sub load_stats
 {
+
        my $fd = shift;
        my %stats = %{ fd_retrieve($fd) };
        my %_overall_stat = %{$stats{overall_stat}};
@@ -3948,9 +3993,12 @@ sub load_stats
        my %_autovacuum_info = %{$stats{autovacuum_info}};
        my %_autoanalyze_info = %{$stats{autoanalyze_info}};
 
+       return if (!$_overall_stat{queries_number});
+
        ### overall_stat ###
 
        $overall_stat{queries_number} += $_overall_stat{queries_number};
+
        $overall_stat{'first_log_ts'} = $_overall_stat{'first_log_ts'}
                if not $overall_stat{'first_log_ts'}
                        or $overall_stat{'first_log_ts'} gt $_overall_stat{'first_log_ts'};
@@ -3960,11 +4008,11 @@ sub load_stats
                        or $overall_stat{'last_log_ts'} lt $_overall_stat{'last_log_ts'};
 
        $overall_stat{first_query_ts} = $_overall_stat{first_query_ts}
-               if not defined $overall_stat{first_query_ts}
+               if not $overall_stat{first_query_ts}
                        or $overall_stat{first_query_ts} gt $_overall_stat{first_query_ts};
 
        $overall_stat{last_query_ts} = $_overall_stat{last_query_ts}
-               if not defined $overall_stat{last_query_ts}
+               if not $overall_stat{last_query_ts}
                        or $overall_stat{last_query_ts} lt $_overall_stat{last_query_ts};
 
        $overall_stat{errors_number} += $_overall_stat{errors_number};