AnyEvent-Proc
view release on metacpan or search on metacpan
lib/AnyEvent/Proc.pm view on Meta::CPAN
sub new {
my ( $class, %options ) = @_;
$options{args} ||= [];
my ( $rIN, $wIN ) = _rpipe;
my ( $rOUT, $wOUT ) = _wpipe;
my ( $rERR, $wERR ) = _wpipe;
my @xhs = @{ delete( $options{extras} ) || [] };
my @args = map { "$_" } @{ delete $options{args} };
my $pid;
my %redir = (
0 => $rIN,
1 => $wOUT,
2 => $wERR,
map { ( "$_" => $_->B ) } @xhs
);
my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
my $waiter = AE::cv;
my $self = bless {
handles => {
in => $wIN,
out => $rOUT,
err => $rERR,
map { ( "$_" => $_->A ) } @xhs,
},
pid => $pid,
listeners => {
exit => delete $options{on_exit},
ttl_exceed => delete $options{on_ttl_exceed},
},
eol => "\n",
cv => $cv,
alive => 1,
waiter => $waiter,
waiters => {
in => [],
out => [],
err => [],
map { ( "$_" => [] ) } @xhs
},
reapers => [],
} => ref $class
|| $class;
map { $_->{proc} = $self } @xhs;
{
my $eol = quotemeta $self->_eol;
$self->{reol} = delete $options{reol} || qr{$eol};
}
if ( $options{ttl} ) {
$self->{timer} = AnyEvent->timer(
after => delete $options{ttl},
cb => sub {
return unless $self->alive;
$self->kill;
$self->_emit('ttl_exceed');
}
);
}
my $kill = sub { $self->end };
if ( $options{timeout} ) {
$wIN->timeout( $options{timeout} );
$rOUT->timeout( $options{timeout} );
$rERR->timeout( $options{timeout} );
delete $options{timeout};
$self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
my $cb = sub { $self->_emit('timeout') };
$wIN->on_timeout($cb);
$rOUT->on_timeout($cb);
$rERR->on_timeout($cb);
}
if ( $options{wtimeout} ) {
$wIN->wtimeout( delete $options{wtimeout} );
$self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
my $cb = sub { $self->_emit('wtimeout') };
$wIN->on_wtimeout($cb);
}
if ( $options{rtimeout} ) {
$rOUT->rtimeout( delete $options{rtimeout} );
$self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
my $cb = sub { $self->_emit('rtimeout') };
$rOUT->on_rtimeout($cb);
}
if ( $options{etimeout} ) {
$rERR->rtimeout( delete $options{etimeout} );
$self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
my $cb = sub { $self->_emit('etimeout') };
$rERR->on_rtimeout($cb);
}
if ( $options{errstr} ) {
my $sref = delete $options{errstr};
$$sref = '';
$self->pipe( err => $sref );
}
if ( $options{outstr} ) {
my $sref = delete $options{outstr};
$$sref = '';
$self->pipe( out => $sref );
}
$waiter->begin;
$cv->cb(
sub {
$self->{status} = shift->recv;
$self->{alive} = 0;
undef $self->{timer};
$waiter->end;
$self->_emit( exit => $self->{status} );
}
);
if ( keys %options ) {
AE::log note => "unknown left-over option(s): " . join ', ' =>
keys %options;
}
$self;
}
sub reader {
my ( $r, $w ) = _wpipe;
bless {
r => $r,
w => $w,
fileno => fileno( $r->fh )
} => __PACKAGE__
. '::R';
}
sub writer {
my ( $r, $w ) = _rpipe;
bless {
r => $r,
w => $w,
fileno => fileno( $w->fh )
} => __PACKAGE__
. '::W';
}
sub run {
my $cv = AE::cv;
run_cb(
@_,
sub {
$cv->send( \@_ );
}
)->recv;
my ( $out, $err, $status ) = @{ $cv->recv };
$? = $status << 8;
if (wantarray) {
return ( $out, $err );
}
else {
carp $err if $err;
return $out;
}
}
sub run_cb {
my $bin = shift;
my $cb = pop;
my @args = @_;
my ( $out, $err ) = ( '', '' );
my $proc = __PACKAGE__->new(
bin => $bin,
args => \@args,
lib/AnyEvent/Proc.pm view on Meta::CPAN
$proc->finish;
$proc->wait(
sub {
my $status = $proc->{status};
$? = $status << 8;
$cb->( $out, $err, $status );
}
);
}
sub _on {
my ( $self, $name, $handler ) = @_;
$self->{listeners}->{$name} = $handler;
}
sub in { shift->_geth('in') }
sub out { shift->_geth('out') }
sub err { shift->_geth('err') }
sub _geth {
shift->{handles}->{ pop() };
}
sub _eol { shift->{eol} }
sub _reol { shift->{reol} }
sub _emit {
my ( $self, $name, @args ) = @_;
AE::log debug => "trapped $name";
if ( exists $self->{listeners}->{$name}
and defined $self->{listeners}->{$name} )
{
$self->{listeners}->{$name}->( $self, @args );
}
}
sub pid {
shift->{pid};
}
sub fire {
my ( $self, $signal ) = @_;
$signal = 'TERM' unless defined $signal;
$signal =~ s{^sig}{}i;
AE::log debug => "fire SIG$signal";
kill uc $signal => $self->pid;
}
sub kill {
my ($self) = @_;
$self->fire('kill');
}
sub fire_and_kill {
my $self = shift;
my $cb = ( ref $_[-1] eq 'CODE' ? pop : undef );
my $time = pop;
my $signal = uc( pop || 'TERM' );
my $w = AnyEvent->timer(
after => $time,
cb => sub {
return unless $self->alive;
$self->kill;
}
);
$self->fire($signal);
if ($cb) {
return $self->wait(
sub {
undef $w;
$cb->(@_);
}
);
}
else {
my $exit = $self->wait;
undef $w;
return $exit;
}
}
sub alive {
my $self = shift;
return 0 unless $self->{alive};
$self->fire(0) ? 1 : 0;
}
sub wait {
my ( $self, $cb ) = @_;
my $next = sub {
my $cv = shift;
$cv->recv;
waitpid $self->{pid} => 0;
$cb->( $self->{status} ) if ref $cb eq 'CODE';
$self->end;
$self->{status};
};
AE::log debug => "waiting for "
. ( $self->{waiter}->{_ae_counter} ) . " ends";
if ($cb) {
$self->{waiter}->cb($next);
return $self->{waiter};
}
else {
$self->{waiter}->recv;
return $next->( $self->{waiter} );
}
}
sub finish {
my ($self) = @_;
$self->in->destroy;
$self;
}
sub end {
my ($self) = @_;
map { $_->destroy } values %{ $self->{handles} };
( run in 0.886 second using v1.01-cache-2.11-cpan-39bf76dae61 )