if ( ($queue_size > 1) && ($logfile !~ /\.(gz|bz2|zip|xz)$/i) ) {
# Create multiple processes to parse one log file by chunks of data
my @chunks = &split_logfile($logfile);
+ &logmsg('DEBUG', "The following boundaries will be used to parse file $logfile, " . join('|', @chunks));
for (my $i = 0; $i < $#chunks; $i++) {
while ($child_count >= $parallel_process) {
my $kid = waitpid(-1, WNOHANG);
die "FATAL: Abort signal received when processing to next chunk\n" if ($interrupt);
push(@tempfiles, [ tempfile('tmp_pgbadgerXXXX', SUFFIX => '.bin', DIR => $TMP_DIR, UNLINK => 1 ) ]);
spawn sub {
- &process_file($logfile, $tempfiles[-1]->[0], $chunks[$i], $chunks[$i+1]);
+ &process_file($logfile, $tempfiles[-1]->[0], $chunks[$i], $chunks[$i+1], $i);
};
$child_count++;
}
}
sleep(1);
}
- # This is the last file, so the progress bar can be closed
- #$pipe->print("QUIT\n");
# Terminate the process logger
foreach my $k (keys %RUNNING_PIDS) {
####
sub process_file
{
- my ($logfile, $tmpoutfile, $start_offset, $stop_offset) = @_;
+ my ($logfile, $tmpoutfile, $start_offset, $stop_offset, $chunk_pos) = @_;
my $old_queries_count = 0;
my $old_errors_count = 0;
# Get file handle and size of the file
my ($lfile, $totalsize) = &get_log_file($logfile);
+ # Reset the start position if file is smaller that the current start offset
+ $start_offset = 0 if ($start_offset > $totalsize);
+ # Check if the first date in the log are after the last date saved
+ if (($format ne 'binary') && ($format ne 'csv')) {
+ if ($start_offset && !$chunk_pos) {
+ if (&check_file_changed($logfile, $saved_last_line{datetime}, 1)) {
+ &logmsg('DEBUG', "This file should be parsed from the beginning: $logfile");
+ &logmsg('DEBUG', "Reverting start offset $start_offset to 0 for file $logfile, stoppping offset is " . ($stop_offset || $totalsize));
+ $start_offset = 0;
+ }
+ # In non multiprocess mode, the progress bar must start at the offset
+ $cursize += $start_offset if (!$tmpoutfile);
+ }
+ }
+
if ($stop_offset > 0) {
$totalsize = $stop_offset - $start_offset;
}
$has_exclusion = 1;
}
$start_offset ||= 0;
- &logmsg('DEBUG', "Start parsing at offset $start_offset of file $logfile");
+ &logmsg('DEBUG', "Start parsing at offset $start_offset of file $logfile to $stop_offset");
if ($start_offset) {
+ # Move to the starting offset position in file
$lfile->seek($start_offset, 0);
}
while (my $line = <$lfile>) {
if (!$tmpoutfile) {
if ($progress && (($nlines % $NUMPROGRESS) == 0)) {
if ($totalsize) {
- if ($stop_offset > 0) {
- print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=');
- } else {
- print STDERR &progress_bar($cursize, $totalsize, 25, '=');
- }
+ print STDERR &progress_bar($cursize, $stop_offset || $totalsize, 25, '=');
} else {
print STDERR ".";
}
next if ($from && ($from gt $prefix_vars{'t_timestamp'}));
if ($to && ($to lt $prefix_vars{'t_timestamp'})) {
- if ($tmpoutfile) {
+ if (!$tmpoutfile) {
+ if ($totalsize) {
+ print STDERR &progress_bar($cursize, $stop_offset || $totalsize, 25, '=');
+ } else {
+ print STDERR ".";
+ }
+ } else {
$pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n");
$old_queries_count = $overall_stat{'queries_number'};
$old_errors_count = $overall_stat{'errors_number'};
}
next if ($from && ($from gt $prefix_vars{'t_timestamp'}));
if ($to && ($to lt $prefix_vars{'t_timestamp'})) {
- if ($tmpoutfile) {
+ if (!$tmpoutfile) {
+ if ($totalsize) {
+ print STDERR &progress_bar($cursize, $stop_offset || $totalsize, 25, '=');
+ } else {
+ print STDERR ".";
+ }
+ } else {
$pipe->print("$cursize " . ($overall_stat{'queries_number'} - $old_queries_count) . " " . ($overall_stat{'errors_number'} - $old_errors_count) . "\n");
$old_queries_count = $overall_stat{'queries_number'};
$old_errors_count = $overall_stat{'errors_number'};
if ($last_parsed) {
$last_line{current_pos} = $current_offset;
}
+ # Forward the progress bar to the ending point in multiprocess mode
+ if ($progress && $tmpoutfile) {
+ $pipe->print("$start_offset 0 0\n");
+ }
}
close $lfile;
if ($progress && ($getout != 1)) {
if (!$tmpoutfile) {
if ($totalsize) {
- if (($stop_offset > 0) && ($format ne 'csv')) {
- print STDERR &progress_bar($cursize - $start_offset, $stop_offset, 25, '=',$overall_stat{'queries_number'},$overall_stat{'errors_number'}, $logfile);
- } else {
- print STDERR &progress_bar($cursize+$start_offset, $totalsize, 25, '=', $overall_stat{'queries_number'},$overall_stat{'errors_number'}, $logfile);
- }
+ print STDERR &progress_bar($cursize, $stop_offset || $totalsize, 25, '=',$overall_stat{'queries_number'},$overall_stat{'errors_number'}, $logfile);
print STDERR "\n";
}
} else {
# start up. Here we just verify that the first date in file is before the last incremental date.
sub check_file_changed
{
- my ($file, $saved_date) = @_;
+ my ($file, $saved_date, $look_at_beginning) = @_;
my ($lfile, $totalsize, $iscompressed) = &get_log_file($file);
my $CURRENT_DATE = $gyear . sprintf("%02d", $gmon + 1) . sprintf("%02d", $gmday);
%prefix_vars = ();
- # do not seek if filesize is smaller than the seek position
- if ($saved_last_line{current_pos} < $totalsize) {
- $lfile->seek($saved_last_line{current_pos} || 0, 0);
+ # If seeking is not explicitely disabled
+ if (!$look_at_beginning) {
+ # do not seek if filesize is smaller than the seek position
+ if ($saved_last_line{current_pos} < $totalsize) {
+ $lfile->seek($saved_last_line{current_pos} || 0, 0);
+ }
}
+
my $more_lines = 0;
while (my $line = <$lfile>) {
$lfile->seek($pos, 0);
#Move the offset to the BEGINNING of each line, because the logic in process_file requires so
$pos= $pos + length(<$lfile>) - 1;
- push(@chunks, $pos);
+ push(@chunks, $pos) if ($pos < $totalsize);
}
+ last if ($pos >= $totalsize);
$i++;
}
$lfile->close();