From: Darold Gilles Date: Thu, 31 Jan 2013 16:28:35 +0000 (+0100) Subject: Allow pgbadger to use several cores, aka multiprocessing. Add -j | --jobs option... X-Git-Tag: v3.2~52 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=507b394b018d6e01c8d13ee456081798a0fed8cb;p=pgbadger Allow pgbadger to use several cores, aka multiprocessing. Add -j | --jobs option to specify the number of core to use. --- diff --git a/pgbadger b/pgbadger old mode 100755 new mode 100644 index 747d281..2a247c3 --- a/pgbadger +++ b/pgbadger @@ -34,12 +34,55 @@ use Benchmark; use File::Basename; use Storable qw(store_fd fd_retrieve); use Time::Local 'timegm_nocheck'; -use POSIX qw(locale_h); +use POSIX qw(locale_h sys_wait_h); setlocale(LC_NUMERIC, ''); setlocale(LC_ALL, 'C'); +use File::Temp qw/ :seekable /; +use Proc::Queue size => 1, ':all'; $VERSION = '2.3'; +#### +# method used to fork as many child as wanted +## +my @global_pids = (); +sub spawn +{ + my $coderef = shift; + + unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE') { + print "usage: spawn CODEREF"; + exit 0; + } + + my $pid; + if (!defined($pid = fork)) { + print STDERR "Error: cannot fork: $!\n"; + return; + } elsif ($pid) { + push(@global_pids, $pid); + return; # the parent + } + # the child -- go spawn + $< = $>; + $( = $); # suid progs only + + exit &$coderef(); +} + +# With multiprocess we need to wait all childs +sub wait_child +{ + + 1 while wait != -1; + $SIG{INT} = \&wait_child; + $SIG{TERM} = \&wait_child; +} +$SIG{INT} = \&wait_child; +$SIG{TERM} = \&wait_child; +$SIG{QUIT} = \&wait_child; +$SIG{'CHLD'} = 'DEFAULT'; + $| = 1; # Global variables overridden during install @@ -107,6 +150,7 @@ my $remove_comment = 0; my $select_only = 0; my $enable_log_min_duration = 0; my $tsung_queries = 0; +my $queue_size = 0; my $NUMPROGRESS = 10000; my @DIMENSIONS = (800, 300); @@ -137,6 +181,7 @@ my $result = GetOptions( "G|nograph!" => \$nograph, "h|help!" => \$help, "i|ident=s" => \$ident, + "j|jobs=s" => \$queue_size, "l|last-parsed=s" => \$last_parsed, "m|maxlength=i" => \$maxlength, "N|appname=s" => \@dbappname, @@ -281,7 +326,9 @@ $outdir = $infs[1] . '/'; $graph = 0 unless ($extension eq 'html' or $extension eq 'binary' ); $graph = 0 if ($nograph); +# Set some default values my $end_top = $top - 1; +$queue_size ||= 1; if ($extension eq 'tsung') { @@ -567,7 +614,56 @@ my @given_log_files = ( @log_files ); # log files must be erase when loading stats from binary format @log_files = () if $format eq 'binary'; -foreach my $logfile ( @given_log_files ) { +if ($queue_size > 1) { + + Proc::Queue::size($queue_size); + + my @tempfiles = (); + if ( ($#given_log_files > 0) || ($format eq 'csv') ) { + my $ix = 0; + foreach my $logfile ( @given_log_files ) { + push(@tempfiles, File::Temp->new( TEMPLATE => 'tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 )); + #push(@tempfiles, "tmp_pgbadger$ix.bin"); + spawn sub { + #&process_file($logfile, "tmp_pgbadger$ix.bin"); + &process_file($logfile, $tempfiles[-1]); + }; + $ix++; + + } + } else { + my @chunks = &split_logfile($given_log_files[0]); + for (my $i = 0; $i < $#chunks; $i++) { + push(@tempfiles, File::Temp->new( TEMPLATE => 'tmp_pgbadgerXXXX', SUFFIX => '.bin', TMPDIR => 1, UNLINK => 1 )); + spawn sub { + &process_file($given_log_files[0], $tempfiles[-1], $chunks[$i], $chunks[$i+1]); + }; + } + } + 1 while wait != -1; + + # Load all data gathered by the differents processes + my $bfile = undef; + foreach my $f (@tempfiles) { + #open($bfile,"<",$f) || die "FATAL: cannot read temporary binary file $f. $!\n"; + #&load_stats($bfile); + $f->seek( 0, 0 ); + &load_stats($f); + $f->close(); + #$bfile->close(); + #unlink($f); + } + +} else { + foreach my $logfile ( @given_log_files ) { + &process_file($logfile); + } +} + + # End of main loop +sub process_file +{ + my ($logfile, $tmpoutfile, $start_offset, $stop_offset) = @_; &logmsg('DEBUG', "Starting to parse log file: $logfile"); @@ -582,6 +678,9 @@ foreach my $logfile ( @given_log_files ) { # Get file handle and size of the file my ($lfile, $totalsize) = &open_log_file($logfile); + if ($stop_offset) { + $totalsize = $stop_offset - $start_offset; + } &logmsg('DEBUG', "Starting reading file $logfile..."); @@ -687,6 +786,10 @@ foreach my $logfile ( @given_log_files ) { my $cur_pid = ''; my @matches = (); my $goon = 0; + if ($start_offset) { + $lfile->seek($start_offset, 0); + $cursize += $start_offset; + } while (my $line = <$lfile>) { $cursize += length($line); chomp($line); @@ -696,7 +799,11 @@ foreach my $logfile ( @given_log_files ) { if ($progress && (($nlines % $NUMPROGRESS) == 0)) { if ($totalsize) { - print STDERR &progress_bar($cursize, $totalsize, 25, '='); + if ($stop_offset) { + print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '='); + } else { + print STDERR &progress_bar($cursize, $totalsize, 25, '='); + } } else { print STDERR "."; } @@ -881,6 +988,7 @@ foreach my $logfile ( @given_log_files ) { # unknown format &logmsg('DEBUG', "Unknown line format: $line"); } + last if ($stop_offset && ($cursize > $stop_offset)); } } close $lfile; @@ -898,12 +1006,21 @@ foreach my $logfile ( @given_log_files ) { if ($progress) { if ($totalsize) { - print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile); + if ($stop_offset && ($format ne 'csv')) { + print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=', $logfile); + } else { + print STDERR &progress_bar($cursize, $totalsize, 25, '=', $logfile); + } } print STDERR "\n"; } - -} # End of main loop + if ($tmpoutfile) { + #my $tfh = new IO::File ">$tmpoutfile"; + #&dump_as_binary($tfh); + &dump_as_binary($tmpoutfile); + #$tfh->close(); + } +} # Save last line parsed if ($last_parsed && scalar keys %last_line) { @@ -935,7 +1052,7 @@ if ($extension ne 'tsung') { &dump_as_text(); } } elsif ($extension eq 'binary') { - &dump_as_binary(); + &dump_as_binary($fh); } else { # Create instance to prettify SQL query if (!$noprettify) { @@ -996,6 +1113,8 @@ Options: -G | --nograph : disable graphs on HTML output. Enable by default. -h | --help : show this message and exit. -i | --ident name : programname used as syslog ident. Default: postgres + -j | --jobs number : number of jobs to run at same time. Default is 1, + run as single process. -l | --last-parsed file: allow incremental log parsing by registering the last datetime and line parsed. Useful if you want to watch errors since last run or if you want one @@ -3672,8 +3791,8 @@ sub load_stats my $_first_log_timestamp = $stats{first_log_timestamp}; my $_last_log_timestamp = $stats{last_log_timestamp}; my @_log_files = @{$stats{log_files}}; - my %_autovacuum_info = @{$stats{autovacuum_info}}; - my %_autoanalyze_info = @{$stats{autoanalyze_info}}; + my %_autovacuum_info = %{$stats{autovacuum_info}}; + my %_autoanalyze_info = %{$stats{autoanalyze_info}}; ### overall_stat ### @@ -4040,6 +4159,8 @@ sub load_stats sub dump_as_binary { + my $lfh = shift(); + store_fd({ 'overall_stat' => \%overall_stat, 'normalyzed_info' => \%normalyzed_info, @@ -4062,7 +4183,7 @@ sub dump_as_binary 'log_files' => \@log_files, 'autovacuum_info' => \%autovacuum_info, 'autoanalyze_info' => \%autoanalyze_info - }, $fh) || die ("Couldn't save binary data to «$outfile»!\n"); + }, $lfh) || die ("Couldn't save binary data to «$outfile»!\n"); } # Highlight SQL code @@ -5696,6 +5817,50 @@ sub open_log_file return ($lfile, $totalsize); } +sub split_logfile +{ + my $logf = shift; + + # get file size + my $totalsize = (stat("$logf"))[7] || 0; + + # Real size of the file is unknown, try to find it + # bz2 does not report real size + if ($logf =~ /\.(gz|zip)/i) { + $totalsize = 0; + my $cmd_file_size = $gzip_uncompress_size; + if ($logf =~ /\.zip/i) { + $cmd_file_size = $zip_uncompress_size; + } + $cmd_file_size =~ s/\%f/$logf/g; + $totalsize = `$cmd_file_size`; + chomp($totalsize); + } elsif ($logf =~ /\.bz2/i) { + $totalsize = 0; + } + + return if (!$totalsize); + + my @chunks = (0); + my $i = 1; + while ($i < $queue_size) { + push(@chunks, int(($totalsize/$queue_size) * $i)); + $i++; + } + push(@chunks, $totalsize); + + return @chunks; +} + + +# Inclusion of Perl package Proc::Queue +# Copyright (C) 2001, 2002, 2003, 2005 Salvador Fandino Garcia +# This library is free software; you can redistribute it and/or modify +# it under the same terms as Perl itself. +{ + print "Proc::Queue should be inserted here\n"; +} + __DATA__