ApacheLog-Parser
view release on metacpan or search on metacpan
bin/loghack view on Meta::CPAN
}
sub nice_name {
my ($name) = @_;
my @d = split(/\/+/, $name);
my $n = '*.' . name_as_date(pop(@d)) . '.*';
@d or return($n);
return(join("/", $d[-1], $n));
}
sub record_source {
my ($opt, $file, $dir, $md5) = @_;
my $writefile = $dir.$md5;
want_dir($dir);
if(-e $writefile) {
warn "skipping $writefile ($file)\n";
return;
}
open(my $fh, '>', $writefile) or die "cannot write '$writefile' $!";
print $fh File::Basename::basename($file), "\n";
close($fh) or die "cannot write '$writefile' $!";
}
sub want_dir {
my ($dir) = @_;
return if(-d $dir);
unless(mkdir($dir)) {
die "cannot create $dir $!" unless(-d $dir);
}
}
sub daemon {
my ($opt, @args) = @_;
my $dir = $opt->{daemon};
chdir($dir) or die "no such dir $dir\n";
$| = 1;
while(my $line = <STDIN>) {
chomp($line);
main(split(/\t/, $line));
print ".done\n";
#warn "done\n";
#die ".done\n";
#sleep(1);
}
}
sub start_cluster {
my ($dir, @hosts) = @_;
require IPC::Open3;
require IO::Select;
require IO::Handle;
my $sel = IO::Select->new();
my %track;
my $prog = basename($0);
foreach my $host (@hosts) {
(my $realhost = $host) =~ s/#\d+$//;
my $stdin;
my ($stdout, $stderr) = map({IO::Handle->new} 1..3);
my $pid = IPC::Open3::open3(
$stdin, $stdout, $stderr,
($realhost eq 'localhost' ? () : ('ssh', $realhost)),
$prog, '-d', $dir
);
#warn "started $pid to $host";
$stdout->autoflush;
$stderr->autoflush;
$pid or die "gah no pid\n";
#warn "$stdin, $stdout, $stderr";
$track{$pid} = my $obj = {
stdin => $stdin,
stdout => $stdout,
stderr => $stderr,
host => $host,
};
$sel->add($obj->{sel_o} = [$stdout, $pid, 'stdout']);
$sel->add($obj->{sel_e} = [$stderr, $pid, 'stderr']);
}
return($sel, %track);
}
my $lglob = sub {
my ($opt, @spec) = @_;
local $opt->{lazy_glob} = 1;
return(repo_files($opt, @spec));
};
my $datethru = sub {shift(@_); _date_dwim(@_) };
my %cluster_fspec = (
report => $lglob,
compile => $datethru,
unique => $lglob,
day_unique => $datethru,
month_unique => sub {shift(@_); @_},
month_unique2 => sub {$_[1]},
);
sub cluster {
my ($opt, $mode, @files) = @_;
require Cwd;
my $dir = Cwd::abs_path($opt->{repository});
if(my $code = $cluster_fspec{$mode}) {
@files = $code->($opt, @files);
#die join("\n ", 'files', @files);
}
else {
@files = repo_files($opt, @files);
foreach my $file (@files) {
my $msg = "missing $file\n";
(-e $file) or $opt->{missok} ? warn $msg : die $msg;
}
}
my @hosts = map({my ($h, $n) = split(/:/, $_);
($n ? map({$h.'#'.$_} 1..$n) : $h)
} split(/, ?/, $opt->{cluster}));
if(@hosts > @files) { # XXX weighting?
warn "that would get boring\n";
$#hosts = $#files;
}
my ($sel, %track) = start_cluster($dir, @hosts);
my $hlen = 0;
foreach my $host (@hosts) {
my $l = length($host);
$hlen = $l if($l > $hlen);
}
#die map({"$_ => " . join(", ", %{$track{$_}})} keys(%track));
my %hmap = map({$track{$_}{host} => $_} keys(%track));
my %sels = map({$track{$_->[1]}{host} => $_} $sel->handles);
my %blacklist;
my $output = sub {
my ($host, $which, @lines) = @_;
my $pref = ($which eq 'stderr' ? '!' : '#');
printf("%-${hlen}s %s %s", $host, $pref, $_) for(@lines);
};
my $end_host = sub {
my ($host) = @_;
my $pid = delete($hmap{$host}) or die "no pid at $host";
my $obj = delete($track{$pid});
warn ' 'x($hlen+1), "closing $host\n";
close($obj->{stdin});
my $errh = $obj->{stdout};
local $SIG{ALRM} = sub { warn "no stderr on $host\n"};
alarm(2);
$output->($host, 'stderr', <$errh>);
alarm(0);
$sel->remove(delete($obj->{sel_o})) or die;
#$errh->blocking(0);
$sel->remove(delete($obj->{sel_e})) or die;
};
my $fill_host = sub {
my ($host) = @_;
if($blacklist{$host}) {
warn "$host is blacklisted\n";
eval { $end_host->($host) };
return;
}
my $pid = $hmap{$host} or die "no pid at $host";
my $obj = $track{$pid};
my $fh = $obj->{stdin};
unless(@files) {
$end_host->($host);
return;
}
my $file = shift(@files);
#warn "fill $host with $file\n";
if($opt->{missok} and not -e $file) {
warn "still missing '$file'\n";
my @later = ($file);
while($file = shift(@files)) {
if(-e $file) {
push(@files, @later);
last;
}
else {
warn "still missing '$file'\n";
push(@later, $file);
}
}
$file or die "out of files to use while waiting\n";
# grr, needs a loop
}
#warn "send $host $mode\t$file\n";
print $fh "$mode\t$file\n";
};
local $SIG{CHLD} = sub {
my $child;
while(($child = waitpid(-1, POSIX::WNOHANG())) > 0) {
if($?) {
my $code = $? >> 8;
my $sig = $? & 127;
my $host = $track{$child}{host};
warn " error $host ($child) status: $? ($code/$sig)\n";
$blacklist{$host} = 1;
$end_host->($host);
}
}
};
# go!
$fill_host->($_) for(@hosts);
my %f = (stderr => 0, stdout => 1);
while($sel->count) {
READ: while(my @ready = $sel->can_read) {
@ready = sort({$f{$a->[2]} <=> $f{$b->[2]}} @ready);
foreach my $bit (@ready) {
my ($fh, $pid, $which) = @$bit;
my $obj = $track{$pid};
my $host = $obj->{host};
$fh->blocking(0);
until(eof($fh)) {
my $line = <$fh>;
# XXX probably never need this bit
unless(defined($line)) {
warn "undef line from $host\n";
$sel->remove($bit);
last;
}
# TODO handle death
if(($which eq 'stdout') and ($line =~ m/^.done$/)) {
#warn "$host said done\n";
$fill_host->($host);
last;
}
$output->($host, $which, $line);
}
$fh->blocking(1);
}
}
#warn "twiddling\n";
}
if(@files) {
die "ACK all my hosts died! (",
scalar(@files), " files left to process.)\n";
}
}
=head2 reskip
Regenerate the skiplist for a given chunk.
=cut
sub do_reskip {
my ($opt, @files) = @_;
@files = repo_files($opt, @files);
my $skipper = get_skipper($opt);
my $doskip = $skipper->get_matcher;
foreach my $file (@files) {
unless(-e $file) {
die "no such file:\n $file\n";
}
my $fh = open_file($file);
my $nicename = nice_name($file);
my $start = time;
print "$nicename -- ",
sprintf("%02d:%02d:%02d", (localtime($start))[2,1,0]), "\n";
( run in 0.953 second using v1.01-cache-2.11-cpan-39bf76dae61 )