MYDan
view release on metacpan or search on metacpan
lib/MYDan/Util/MIO/CMD.pm view on Meta::CPAN
=head1 SYNOPSIS
use MYDan::Util::MIO::CMD;
my @node = qw( host1 host2 ... );
my @cmd = qw( ssh {} wc );
my $cmd = MYDan::Util::MIO::CMD->new( map { $_ => \@cmd } @node );
my $result = $cmd->run( max => 32, log => \*STDERR, timeout => 300 );
my $stdout = $result->{stdout};
my $stderr = $result->{stderr};
my $error = $result->{error};
=cut
use strict;
use warnings;
use Carp;
use IPC::Open3;
use Time::HiRes qw( time );
use POSIX qw( :sys_wait_h );
use IO::Poll qw( POLLIN POLLHUP POLLOUT );
use Tie::File;
use FindBin qw( $Script );
use base qw( MYDan::Util::MIO );
our %RUN = ( %MYDan::Util::MIO::RUN, interchange => '{}' );
our %MAX = %MYDan::Util::MIO::MAX;
sub new
{
my $self = shift;
$self->cmd( @_ );
}
=head1 METHODS
=head3 run( %param )
Run commands in parallel.
The following parameters may be defined in I<%param>:
max : ( default 128 ) number of commands in parallel.
log : ( default STDERR ) a handle to report progress.
timeout : ( default 300 ) number of seconds allotted for each command.
input : ( default from STDIN ) input buffer.
Returns HASH of HASH of nodes. First level is indexed by type
( I<stdout>, I<stderr>, or I<error> ). Second level is indexed by message.
=cut
sub run
{
local $| = 1;
my $self = shift;
my @node = keys %$self;
my ( $run, $ext, %run, %result, %busy ) = ( 1, "$Script.$$", %RUN, @_ );
my ( $max, $timeout, $interchange ) = @run{ qw( max timeout interchange ) };
my $input = defined $run{input} ? $run{input} : -t STDIN ? '' : <STDIN>;
$SIG{INT} = $SIG{TERM} = sub
{
print STDERR "killed\n";
$run = 0;
};
for ( my $time = time; $run && ( @node || %busy ); )
{
$run = 0 if time - $time > $timeout;
while ( @node && keys %busy < $max )
{
my $node = shift @node;
my $log = "/tmp/$node.$ext";
my $cmd = $self->{$node};
my @cmd = map { my $t = $_; $t =~ s/$interchange/$node/g; $t } @$cmd;
if ( $run{noop} )
{
print join ' ', @cmd, "\n";
next;
}
print "$node started.\n" if $run{verbose};
if ( my $pid = fork() ) { $busy{$pid} = [ $log, $node ]; next }
open STDOUT, ">>$log";
open STDERR, ">>$log";
exec sprintf join ' ', @cmd;
exit 0;
}
for ( keys %busy )
{
my $pid = waitpid( -1, WNOHANG );
next if $pid <= 0;
my $stat = $? >> 8;
my ( $log, $node ) = @{ delete $busy{$pid} };
print "$node done.\n" if $run{verbose};
tie my @log, 'Tie::File', $log,recsep => "\n";
my $tmp = join "\n", @log;
$tmp =~ s/$node/$interchange/g if $run{xx};
push @{ $result{output}{ join "\n", $tmp, "--- $stat", '' } }, $node;
unlink $log;
}
}
kill 9, keys %busy;
push @{ $result{output}{killed} }, map{ $busy{$_}[1]}keys %busy;
push @{ $result{output}{norun} }, @node;
unlink glob "/tmp/*.$ext";
unlink $input if $input && -f $input;
return wantarray ? %result : \%result;
}
1;
( run in 0.755 second using v1.01-cache-2.11-cpan-5a3173703d6 )