ApacheLog-Parser

 view release on metacpan or  search on metacpan

bin/loghack  view on Meta::CPAN

}
sub nice_name {
  my ($name) = @_;
  my @d = split(/\/+/, $name);
  my $n = '*.' . name_as_date(pop(@d)) . '.*';
  @d or return($n);
  return(join("/", $d[-1], $n));
}
sub record_source {
  my ($opt, $file, $dir, $md5) = @_;

  my $writefile = $dir.$md5;

  want_dir($dir);

  if(-e $writefile) {
    warn "skipping $writefile ($file)\n";
    return;
  }
  open(my $fh, '>', $writefile) or die "cannot write '$writefile' $!";
  print $fh File::Basename::basename($file), "\n";
  close($fh) or die "cannot write '$writefile' $!";
}
sub want_dir {
  my ($dir) = @_;

  return if(-d $dir);

  unless(mkdir($dir)) {
    die "cannot create $dir $!" unless(-d $dir);
  }
}

sub daemon {
  my ($opt, @args) = @_;

  my $dir = $opt->{daemon};
  chdir($dir) or die "no such dir $dir\n";
  $| = 1;
  while(my $line = <STDIN>) {
    chomp($line);
    main(split(/\t/, $line));
    print ".done\n";
    #warn "done\n";
    #die ".done\n";
    #sleep(1);
  }
}

sub start_cluster {
  my ($dir, @hosts) = @_;
  require IPC::Open3;
  require IO::Select;
  require IO::Handle;
  my $sel = IO::Select->new();
  my %track;
  my $prog = basename($0);
  foreach my $host (@hosts) {
    (my $realhost = $host) =~ s/#\d+$//;
    my $stdin;
    my ($stdout, $stderr) = map({IO::Handle->new} 1..3);
    my $pid = IPC::Open3::open3(
      $stdin, $stdout, $stderr,
      ($realhost eq 'localhost' ? () : ('ssh', $realhost)),
      $prog, '-d', $dir
    );
    #warn "started $pid to $host";
    $stdout->autoflush;
    $stderr->autoflush;
    $pid or die "gah no pid\n";
    #warn "$stdin, $stdout, $stderr";
    $track{$pid} = my $obj = {
      stdin  => $stdin,
      stdout => $stdout,
      stderr => $stderr,
      host   => $host,
    };
    $sel->add($obj->{sel_o} = [$stdout, $pid, 'stdout']);
    $sel->add($obj->{sel_e} = [$stderr, $pid, 'stderr']);
  }
  return($sel, %track);
}
my $lglob = sub {
  my ($opt, @spec) = @_;
  local $opt->{lazy_glob} = 1;
  return(repo_files($opt, @spec));
};
my $datethru = sub {shift(@_); _date_dwim(@_) };
my %cluster_fspec = (
  report        => $lglob,
  compile       => $datethru,
  unique        => $lglob,
  day_unique    => $datethru,
  month_unique  => sub {shift(@_); @_},
  month_unique2 => sub {$_[1]},
);
sub cluster {
  my ($opt, $mode, @files) = @_;

  require Cwd;
  my $dir = Cwd::abs_path($opt->{repository});

  if(my $code = $cluster_fspec{$mode}) {
    @files = $code->($opt, @files);
    #die join("\n  ", 'files', @files);
  }
  else {
    @files = repo_files($opt, @files);
    foreach my $file (@files) {
      my $msg = "missing $file\n";
      (-e $file) or $opt->{missok} ? warn $msg : die $msg;
    }
  }

  my @hosts = map({my ($h, $n) = split(/:/, $_);
    ($n ? map({$h.'#'.$_} 1..$n) : $h)
  } split(/, ?/, $opt->{cluster}));

  if(@hosts > @files) { # XXX weighting?
    warn "that would get boring\n";
    $#hosts = $#files;
  }
  my ($sel, %track) = start_cluster($dir, @hosts);

  my $hlen = 0;
  foreach my $host (@hosts) {
    my $l = length($host);
    $hlen = $l if($l > $hlen);
  }

  #die map({"$_ => " . join(", ", %{$track{$_}})} keys(%track));
  my %hmap = map({$track{$_}{host} => $_} keys(%track));
  my %sels = map({$track{$_->[1]}{host} => $_} $sel->handles); 

  my %blacklist;
  my $output = sub {
    my ($host, $which, @lines) = @_;
    my $pref = ($which eq 'stderr' ? '!' : '#');
    printf("%-${hlen}s %s %s", $host, $pref, $_) for(@lines);
  };
  my $end_host = sub {
    my ($host) = @_;

    my $pid = delete($hmap{$host}) or die "no pid at $host";
    my $obj = delete($track{$pid});

    warn ' 'x($hlen+1), "closing $host\n";
    close($obj->{stdin});
    my $errh = $obj->{stdout};
    local $SIG{ALRM} = sub { warn "no stderr on $host\n"};
    alarm(2);
    $output->($host, 'stderr', <$errh>);
    alarm(0);
    $sel->remove(delete($obj->{sel_o})) or die;
    #$errh->blocking(0);
    $sel->remove(delete($obj->{sel_e})) or die;
  };
  my $fill_host = sub {
    my ($host) = @_;

    if($blacklist{$host}) {
      warn "$host is blacklisted\n";
      eval { $end_host->($host) };
      return;
    }

    my $pid = $hmap{$host} or die "no pid at $host";
    my $obj = $track{$pid};
    my $fh = $obj->{stdin};

    unless(@files) {
      $end_host->($host);
      return;
    }

    my $file = shift(@files);
    #warn "fill $host with $file\n";
    if($opt->{missok} and not -e $file) {
      warn "still missing '$file'\n";
      my @later = ($file);
      while($file = shift(@files)) {
        if(-e $file) {
          push(@files, @later);
          last;
        }
        else {
          warn "still missing '$file'\n";
          push(@later, $file);
        }
      }
      $file or die "out of files to use while waiting\n";
      # grr, needs a loop
    }
    #warn "send $host $mode\t$file\n";
    print $fh "$mode\t$file\n";
  };
  local $SIG{CHLD} = sub {
    my $child;
    while(($child = waitpid(-1, POSIX::WNOHANG())) > 0) {
      if($?) {
        my $code = $? >> 8;
        my $sig = $? & 127;
        my $host = $track{$child}{host};
        warn "  error $host ($child) status: $? ($code/$sig)\n";
        $blacklist{$host} = 1;
        $end_host->($host);
      }
    }
  };


  # go!
  $fill_host->($_) for(@hosts);

  my %f = (stderr => 0, stdout => 1);
  while($sel->count) {
    READ: while(my @ready = $sel->can_read) {
      @ready = sort({$f{$a->[2]} <=> $f{$b->[2]}} @ready);

      foreach my $bit (@ready) {
        my ($fh, $pid, $which) = @$bit;
        my $obj = $track{$pid};
        my $host = $obj->{host};
        $fh->blocking(0);
        until(eof($fh)) {
          my $line = <$fh>;

          # XXX probably never need this bit
          unless(defined($line)) {
            warn "undef line from $host\n";
            $sel->remove($bit);
            last;
          }
          # TODO handle death
          if(($which eq 'stdout') and ($line =~ m/^.done$/)) {
            #warn "$host said done\n";
            $fill_host->($host);
            last;
          }
          $output->($host, $which, $line);
        }
        $fh->blocking(1);
      }
    }
    #warn "twiddling\n";
  }
  if(@files) {
    die "ACK all my hosts died! (",
      scalar(@files), " files left to process.)\n";
  }
}

=head2 reskip

Regenerate the skiplist for a given chunk.

=cut

sub do_reskip {
  my ($opt, @files) = @_;

  @files = repo_files($opt, @files);

  my $skipper = get_skipper($opt);
  my $doskip = $skipper->get_matcher;
  foreach my $file (@files) {
    unless(-e $file) {
      die "no such file:\n  $file\n";
    }
    my $fh = open_file($file);

    my $nicename = nice_name($file);
    my $start = time;
    print "$nicename -- ",
      sprintf("%02d:%02d:%02d", (localtime($start))[2,1,0]), "\n";



( run in 0.953 second using v1.01-cache-2.11-cpan-39bf76dae61 )