]> granicus.if.org Git - pgbadger/commitdiff
Allow pgbadger to use several cores, aka multiprocessing. Add -j | --jobs option...
authorDarold Gilles <gilles@darold.net>
Thu, 31 Jan 2013 16:28:35 +0000 (17:28 +0100)
committerDarold Gilles <gilles@darold.net>
Thu, 31 Jan 2013 16:28:35 +0000 (17:28 +0100)
pgbadger [changed mode: 0755->0644]

old mode 100755 (executable)
new mode 100644 (file)
index 747d281..2a247c3
--- a/pgbadger
+++ b/pgbadger
@@ -34,12 +34,55 @@ use Benchmark;
 use File::Basename;
 use Storable qw(store_fd fd_retrieve);
 use Time::Local 'timegm_nocheck';
-use POSIX qw(locale_h);
+use POSIX qw(locale_h sys_wait_h);
 setlocale(LC_NUMERIC, '');
 setlocale(LC_ALL,     'C');
+use File::Temp qw/ :seekable /;
+use Proc::Queue size => 1, ':all';
 
 $VERSION = '2.3';
 
+####
+# method used to fork as many child as wanted
+##
+my @global_pids = ();
+sub spawn
+{
+       my $coderef = shift;
+
+       unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') {
+               print "usage: spawn CODEREF";
+               exit 0;
+       }
+
+       my $pid;
+       if (!defined($pid = fork)) {
+               print STDERR "Error: cannot fork: $!\n";
+               return;
+       } elsif ($pid) {
+               push(@global_pids, $pid);
+               return; # the parent
+       }
+       # the child -- go spawn
+       $< = $>;
+       $( = $); # suid progs only
+
+       exit &$coderef();
+}
+
+# With multiprocess we need to wait all childs
+sub wait_child
+{
+
+        1 while wait != -1;
+        $SIG{INT} = \&wait_child;
+        $SIG{TERM} = \&wait_child;
+}
+$SIG{INT} = \&wait_child;
+$SIG{TERM} = \&wait_child;
+$SIG{QUIT} = \&wait_child;
+$SIG{'CHLD'} = 'DEFAULT';
+
 $| = 1;
 
 # Global variables overridden during install
@@ -107,6 +150,7 @@ my $remove_comment          = 0;
 my $select_only             = 0;
 my $enable_log_min_duration = 0;
 my $tsung_queries           = 0;
+my $queue_size              = 0;
 
 my $NUMPROGRESS = 10000;
 my @DIMENSIONS  = (800, 300);
@@ -137,6 +181,7 @@ my $result = GetOptions(
        "G|nograph!"               => \$nograph,
        "h|help!"                  => \$help,
        "i|ident=s"                => \$ident,
+       "j|jobs=s"                 => \$queue_size,
        "l|last-parsed=s"          => \$last_parsed,
        "m|maxlength=i"            => \$maxlength,
        "N|appname=s"              => \@dbappname,
@@ -281,7 +326,9 @@ $outdir = $infs[1] . '/';
 $graph = 0 unless ($extension eq 'html' or $extension eq 'binary' );
 $graph = 0 if ($nograph);
 
+# Set some default values
 my $end_top = $top - 1;
+$queue_size ||= 1;
 
 if ($extension eq 'tsung') {
 
@@ -567,7 +614,56 @@ my @given_log_files = ( @log_files );
 # log files must be erase when loading stats from binary format
 @log_files = () if $format eq 'binary';
 
-foreach my $logfile ( @given_log_files ) {
+if ($queue_size > 1) {
+
+       Proc::Queue::size($queue_size);
+
+       my @tempfiles = ();
+       if ( ($#given_log_files > 0) || ($format eq 'csv') ) {
+               my $ix = 0;
+               foreach my $logfile ( @given_log_files ) {
+                       push(@tempfiles, File::Temp->new( TEMPLATE => 'tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 ));
+                       #push(@tempfiles, "tmp_pgbadger$ix.bin");
+                       spawn sub {
+                               #&process_file($logfile, "tmp_pgbadger$ix.bin");
+                               &process_file($logfile, $tempfiles[-1]);
+                       };
+                       $ix++;
+
+               }
+       } else {
+               my @chunks = &split_logfile($given_log_files[0]);
+               for (my $i = 0; $i < $#chunks; $i++) {
+                       push(@tempfiles, File::Temp->new( TEMPLATE => 'tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 ));
+                       spawn sub {
+                               &process_file($given_log_files[0], $tempfiles[-1], $chunks[$i], $chunks[$i+1]);
+                       };
+               } 
+       }
+       1 while wait != -1;
+
+       # Load all data gathered by the differents processes
+       my $bfile = undef;
+       foreach my $f (@tempfiles) {
+               #open($bfile,"<",$f) || die "FATAL: cannot read temporary binary file $f. $!\n";
+               #&load_stats($bfile);
+               $f->seek( 0, 0 );
+               &load_stats($f);
+               $f->close();
+               #$bfile->close();
+               #unlink($f);
+       }
+
+} else {
+       foreach my $logfile ( @given_log_files ) {
+               &process_file($logfile);
+       }
+}
+
+ # End of main loop
+sub process_file
+{
+       my ($logfile, $tmpoutfile, $start_offset, $stop_offset) = @_;
 
        &logmsg('DEBUG', "Starting to parse log file: $logfile");
 
@@ -582,6 +678,9 @@ foreach my $logfile ( @given_log_files ) {
 
        # Get file handle and size of the file
        my ($lfile, $totalsize) = &open_log_file($logfile);
+       if ($stop_offset) {
+               $totalsize = $stop_offset - $start_offset;
+       }
 
        &logmsg('DEBUG', "Starting reading file $logfile...");
        
@@ -687,6 +786,10 @@ foreach my $logfile ( @given_log_files ) {
                my $cur_pid = '';
                my @matches = ();
                my $goon = 0;
+               if ($start_offset) {
+                       $lfile->seek($start_offset, 0);
+                       $cursize += $start_offset;
+               }
                while (my $line = <$lfile>) {
                        $cursize += length($line);
                        chomp($line);
@@ -696,7 +799,11 @@ foreach my $logfile ( @given_log_files ) {
 
                        if ($progress && (($nlines % $NUMPROGRESS) == 0)) {
                                if ($totalsize) {
-                                       print STDERR &progress_bar($cursize, $totalsize, 25, '=');
+                                       if ($stop_offset) {
+                                               print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=');
+                                       } else {
+                                               print STDERR &progress_bar($cursize, $totalsize, 25, '=');
+                                       }
                                } else {
                                        print STDERR ".";
                                }
@@ -881,6 +988,7 @@ foreach my $logfile ( @given_log_files ) {
                                # unknown format
                                &logmsg('DEBUG', "Unknown line format: $line");
                        }
+                       last if ($stop_offset && ($cursize > $stop_offset));
                }
        }
        close $lfile;
@@ -898,12 +1006,21 @@ foreach my $logfile ( @given_log_files ) {
 
        if ($progress) {
                if ($totalsize) {
-                       print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile);
+                       if ($stop_offset && ($format ne 'csv')) {
+                               print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=', $logfile);
+                       } else {
+                               print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile);
+                       }
                }
                print STDERR "\n";
        }
-
-} # End of main loop
+       if ($tmpoutfile) {
+               #my $tfh = new IO::File ">$tmpoutfile";
+               #&dump_as_binary($tfh);
+               &dump_as_binary($tmpoutfile);
+               #$tfh->close();
+       }
+}
 
 # Save last line parsed
 if ($last_parsed && scalar keys %last_line) {
@@ -935,7 +1052,7 @@ if ($extension ne 'tsung') {
                        &dump_as_text();
                }
        } elsif ($extension eq 'binary') {
-               &dump_as_binary();
+               &dump_as_binary($fh);
        } else {
                # Create instance to prettify SQL query
                if (!$noprettify) {
@@ -996,6 +1113,8 @@ Options:
     -G | --nograph         : disable graphs on HTML output. Enable by default.
     -h | --help            : show this message and exit.
     -i | --ident name      : programname used as syslog ident. Default: postgres
+    -j | --jobs number     : number of jobs to run at same time. Default is 1,
+                            run as single process.
     -l | --last-parsed file: allow incremental log parsing by registering the
                              last datetime and line parsed. Useful if you want
                              to watch errors since last run or if you want one
@@ -3672,8 +3791,8 @@ sub load_stats
        my $_first_log_timestamp = $stats{first_log_timestamp};
        my $_last_log_timestamp = $stats{last_log_timestamp};
        my @_log_files = @{$stats{log_files}};
-       my %_autovacuum_info = @{$stats{autovacuum_info}};
-       my %_autoanalyze_info = @{$stats{autoanalyze_info}};
+       my %_autovacuum_info = %{$stats{autovacuum_info}};
+       my %_autoanalyze_info = %{$stats{autoanalyze_info}};
 
        ### overall_stat ###
 
@@ -4040,6 +4159,8 @@ sub load_stats
 
 sub dump_as_binary
 {
+       my $lfh = shift();
+
        store_fd({
                'overall_stat' => \%overall_stat,
                'normalyzed_info' => \%normalyzed_info,
@@ -4062,7 +4183,7 @@ sub dump_as_binary
                'log_files' => \@log_files,
                'autovacuum_info' => \%autovacuum_info,
                'autoanalyze_info' => \%autoanalyze_info
-       }, $fh) || die ("Couldn't save binary data to «$outfile»!\n");
+       }, $lfh) || die ("Couldn't save binary data to «$outfile»!\n");
 }
 
 # Highlight SQL code
@@ -5696,6 +5817,50 @@ sub open_log_file
        return ($lfile, $totalsize);
 }
 
+sub split_logfile
+{
+       my $logf = shift;
+
+       # get file size
+       my $totalsize = (stat("$logf"))[7] || 0;
+
+       # Real size of the file is unknown, try to find it
+       # bz2 does not report real size
+       if ($logf =~ /\.(gz|zip)/i) {
+               $totalsize = 0;
+               my $cmd_file_size = $gzip_uncompress_size;
+               if ($logf =~ /\.zip/i) {
+                       $cmd_file_size = $zip_uncompress_size;
+               }
+               $cmd_file_size =~ s/\%f/$logf/g;
+               $totalsize = `$cmd_file_size`;
+               chomp($totalsize);
+       } elsif ($logf =~ /\.bz2/i) {
+               $totalsize = 0;
+       }
+
+       return if (!$totalsize);
+
+       my @chunks = (0);
+       my $i = 1;
+       while ($i < $queue_size) {
+               push(@chunks, int(($totalsize/$queue_size) * $i));
+               $i++;
+       }
+       push(@chunks, $totalsize);
+
+       return @chunks;
+}
+
+
+# Inclusion of Perl package Proc::Queue
+# Copyright (C) 2001, 2002, 2003, 2005 Salvador Fandino Garcia
+# This library is free software; you can redistribute it and/or modify
+# it under the same terms as Perl itself.
+{
+       print "Proc::Queue should be inserted here\n";
+}
+
 __DATA__
 
 <script type="text/javascript">