ApacheLog-Parser
view release on metacpan or search on metacpan
bin/cron.loghack view on Meta::CPAN
$home ||= shift(@args);
defined($home) or die "must have a 'home' argument!\n";
chdir($home) or die "cannot chdir('$home') $!";
$o{path} = File::Fu->file($o{path});
my $self = bless(\%o, __PACKAGE__);
local $SIG{__DIE__} = $self->quiet ? sub {
die @_ if $^S; # get out if we're in an eval
$self->stderr(@_);
$self->death(@_);
} : $SIG{__DIE__};
my @got = $self->do_fetch(@only ? @only : $self->servers);
my @links = $self->do_links(@got);
$self->do_import(@links);
$self->do_archive(@links);
}
sub servers {
bin/cron.loghack view on Meta::CPAN
return($ret, $out, $err);
}
sub run {
my $self = shift;
my (@command) = @_;
$self->stdout('running', " @command");
my ($ret, $out, $err) = $self->_run(@command);
$self->stdout(split(/\n/, $out));
$self->stderr(split(/\n/, $err));
$ret or die "@command failed:\n$err";
return($out);
}
sub capture {
my $self = shift;
my ($ret, $out, $err) = $self->_run(@_);
$ret or die "@_ failed $err";
my @out = split(/\n/, $out);
(@out == 1) and return($out[0]);
return(@out);
}
sub death {
my $self = shift;
my (@last) = @_;
my %sym = (stderr => 'E ', stdout => '# ');
print "DEATH\n @last\n";
my $out = $self->{outputs} ||= [];
foreach my $line (@$out) {
my $type = shift(@$line);
my $c = $sym{$type} || $type; # hmm
print join(' ', $c, @$line), "\n";
}
}
my %fh = (
stderr => \*STDERR,
stdout => \*STDOUT,
);
sub stderr {
my $self = shift;
$self->_store_io(stderr => @_);
}
sub stdout {
my $self = shift;
$self->_store_io(stdout => @_);
}
sub _store_io {
my $self = shift;
my ($type, @lines) = @_;
unless($self->quiet) {
my $fh = $fh{$type};
bin/loghack view on Meta::CPAN
my ($dir, @hosts) = @_;
require IPC::Open3;
require IO::Select;
require IO::Handle;
my $sel = IO::Select->new();
my %track;
my $prog = basename($0);
foreach my $host (@hosts) {
(my $realhost = $host) =~ s/#\d+$//;
my $stdin;
my ($stdout, $stderr) = map({IO::Handle->new} 1..3);
my $pid = IPC::Open3::open3(
$stdin, $stdout, $stderr,
($realhost eq 'localhost' ? () : ('ssh', $realhost)),
$prog, '-d', $dir
);
#warn "started $pid to $host";
$stdout->autoflush;
$stderr->autoflush;
$pid or die "gah no pid\n";
#warn "$stdin, $stdout, $stderr";
$track{$pid} = my $obj = {
stdin => $stdin,
stdout => $stdout,
stderr => $stderr,
host => $host,
};
$sel->add($obj->{sel_o} = [$stdout, $pid, 'stdout']);
$sel->add($obj->{sel_e} = [$stderr, $pid, 'stderr']);
}
return($sel, %track);
}
my $lglob = sub {
my ($opt, @spec) = @_;
local $opt->{lazy_glob} = 1;
return(repo_files($opt, @spec));
};
my $datethru = sub {shift(@_); _date_dwim(@_) };
my %cluster_fspec = (
bin/loghack view on Meta::CPAN
$hlen = $l if($l > $hlen);
}
#die map({"$_ => " . join(", ", %{$track{$_}})} keys(%track));
my %hmap = map({$track{$_}{host} => $_} keys(%track));
my %sels = map({$track{$_->[1]}{host} => $_} $sel->handles);
my %blacklist;
my $output = sub {
my ($host, $which, @lines) = @_;
my $pref = ($which eq 'stderr' ? '!' : '#');
printf("%-${hlen}s %s %s", $host, $pref, $_) for(@lines);
};
my $end_host = sub {
my ($host) = @_;
my $pid = delete($hmap{$host}) or die "no pid at $host";
my $obj = delete($track{$pid});
warn ' 'x($hlen+1), "closing $host\n";
close($obj->{stdin});
my $errh = $obj->{stdout};
local $SIG{ALRM} = sub { warn "no stderr on $host\n"};
alarm(2);
$output->($host, 'stderr', <$errh>);
alarm(0);
$sel->remove(delete($obj->{sel_o})) or die;
#$errh->blocking(0);
$sel->remove(delete($obj->{sel_e})) or die;
};
my $fill_host = sub {
my ($host) = @_;
if($blacklist{$host}) {
warn "$host is blacklisted\n";
bin/loghack view on Meta::CPAN
$blacklist{$host} = 1;
$end_host->($host);
}
}
};
# go!
$fill_host->($_) for(@hosts);
my %f = (stderr => 0, stdout => 1);
while($sel->count) {
READ: while(my @ready = $sel->can_read) {
@ready = sort({$f{$a->[2]} <=> $f{$b->[2]}} @ready);
foreach my $bit (@ready) {
my ($fh, $pid, $which) = @$bit;
my $obj = $track{$pid};
my $host = $obj->{host};
$fh->blocking(0);
until(eof($fh)) {
( run in 0.283 second using v1.01-cache-2.11-cpan-4e96b696675 )