AnyEvent-Proc
view release on metacpan or search on metacpan
lib/AnyEvent/Proc.pm view on Meta::CPAN
exec {$bin} @$cmd;
POSIX::_exit(126);
}
$$pidref = $pid;
my $w;
$w = AE::child $pid => sub {
my $status = $_[1] >> 8;
my $signal = $_[1] & 127;
my $coredump = $_[1] & 128;
AE::log info => "child exited with status $status" if $status;
AE::log debug => "child exited with signal $signal" if $signal;
AE::log note => "child exited with coredump" if $coredump;
undef $w;
map { close $_ } values %redir;
$cv->send($status);
};
$cv;
}
sub new {
my ( $class, %options ) = @_;
$options{args} ||= [];
my ( $rIN, $wIN ) = _rpipe;
my ( $rOUT, $wOUT ) = _wpipe;
my ( $rERR, $wERR ) = _wpipe;
my @xhs = @{ delete( $options{extras} ) || [] };
my @args = map { "$_" } @{ delete $options{args} };
my $pid;
my %redir = (
0 => $rIN,
1 => $wOUT,
2 => $wERR,
map { ( "$_" => $_->B ) } @xhs
);
my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
my $waiter = AE::cv;
my $self = bless {
handles => {
in => $wIN,
out => $rOUT,
err => $rERR,
map { ( "$_" => $_->A ) } @xhs,
},
pid => $pid,
listeners => {
exit => delete $options{on_exit},
ttl_exceed => delete $options{on_ttl_exceed},
},
eol => "\n",
cv => $cv,
alive => 1,
waiter => $waiter,
waiters => {
in => [],
out => [],
err => [],
map { ( "$_" => [] ) } @xhs
},
reapers => [],
} => ref $class
|| $class;
map { $_->{proc} = $self } @xhs;
{
my $eol = quotemeta $self->_eol;
$self->{reol} = delete $options{reol} || qr{$eol};
}
if ( $options{ttl} ) {
$self->{timer} = AnyEvent->timer(
after => delete $options{ttl},
cb => sub {
return unless $self->alive;
$self->kill;
$self->_emit('ttl_exceed');
}
);
}
my $kill = sub { $self->end };
if ( $options{timeout} ) {
$wIN->timeout( $options{timeout} );
$rOUT->timeout( $options{timeout} );
$rERR->timeout( $options{timeout} );
delete $options{timeout};
$self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
my $cb = sub { $self->_emit('timeout') };
$wIN->on_timeout($cb);
$rOUT->on_timeout($cb);
$rERR->on_timeout($cb);
}
if ( $options{wtimeout} ) {
$wIN->wtimeout( delete $options{wtimeout} );
$self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
my $cb = sub { $self->_emit('wtimeout') };
$wIN->on_wtimeout($cb);
}
if ( $options{rtimeout} ) {
$rOUT->rtimeout( delete $options{rtimeout} );
$self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
my $cb = sub { $self->_emit('rtimeout') };
$rOUT->on_rtimeout($cb);
}
if ( $options{etimeout} ) {
$rERR->rtimeout( delete $options{etimeout} );
$self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
my $cb = sub { $self->_emit('etimeout') };
$rERR->on_rtimeout($cb);
}
if ( $options{errstr} ) {
my $sref = delete $options{errstr};
$$sref = '';
$self->pipe( err => $sref );
}
if ( $options{outstr} ) {
my $sref = delete $options{outstr};
lib/AnyEvent/Proc.pm view on Meta::CPAN
fileno => fileno( $w->fh )
} => __PACKAGE__
. '::W';
}
sub run {
my $cv = AE::cv;
run_cb(
@_,
sub {
$cv->send( \@_ );
}
)->recv;
my ( $out, $err, $status ) = @{ $cv->recv };
$? = $status << 8;
if (wantarray) {
return ( $out, $err );
}
else {
carp $err if $err;
return $out;
}
}
sub run_cb {
my $bin = shift;
my $cb = pop;
my @args = @_;
my ( $out, $err ) = ( '', '' );
my $proc = __PACKAGE__->new(
bin => $bin,
args => \@args,
outstr => \$out,
errstr => \$err
);
$proc->finish;
$proc->wait(
sub {
my $status = $proc->{status};
$? = $status << 8;
$cb->( $out, $err, $status );
}
);
}
sub _on {
my ( $self, $name, $handler ) = @_;
$self->{listeners}->{$name} = $handler;
}
sub in { shift->_geth('in') }
sub out { shift->_geth('out') }
sub err { shift->_geth('err') }
sub _geth {
shift->{handles}->{ pop() };
}
sub _eol { shift->{eol} }
sub _reol { shift->{reol} }
sub _emit {
my ( $self, $name, @args ) = @_;
AE::log debug => "trapped $name";
if ( exists $self->{listeners}->{$name}
and defined $self->{listeners}->{$name} )
{
$self->{listeners}->{$name}->( $self, @args );
}
}
sub pid {
shift->{pid};
}
sub fire {
my ( $self, $signal ) = @_;
$signal = 'TERM' unless defined $signal;
$signal =~ s{^sig}{}i;
AE::log debug => "fire SIG$signal";
kill uc $signal => $self->pid;
}
sub kill {
my ($self) = @_;
$self->fire('kill');
}
sub fire_and_kill {
my $self = shift;
my $cb = ( ref $_[-1] eq 'CODE' ? pop : undef );
my $time = pop;
my $signal = uc( pop || 'TERM' );
my $w = AnyEvent->timer(
after => $time,
cb => sub {
return unless $self->alive;
$self->kill;
}
);
$self->fire($signal);
if ($cb) {
return $self->wait(
sub {
undef $w;
$cb->(@_);
}
);
}
else {
my $exit = $self->wait;
undef $w;
return $exit;
}
}
sub alive {
my $self = shift;
return 0 unless $self->{alive};
$self->fire(0) ? 1 : 0;
lib/AnyEvent/Proc.pm view on Meta::CPAN
if ($cb) {
$self->{waiter}->cb($next);
return $self->{waiter};
}
else {
$self->{waiter}->recv;
return $next->( $self->{waiter} );
}
}
sub finish {
my ($self) = @_;
$self->in->destroy;
$self;
}
sub end {
my ($self) = @_;
map { $_->destroy } values %{ $self->{handles} };
map { $_->() } @{ $self->{reapers} };
$self;
}
sub stop_timeout {
my ($self) = @_;
$self->in->timeout(0);
$self->out->timeout(0);
$self->err->timeout(0);
}
sub stop_wtimeout {
my ($self) = @_;
$self->in->wtimeout(0);
}
sub stop_rtimeout {
my ($self) = @_;
$self->out->rtimeout(0);
}
sub stop_etimeout {
my ($self) = @_;
$self->err->rtimeout(0);
}
sub write {
my ( $self, $type, @args ) = @_;
my $ok = 0;
try {
$self->_geth('in')->push_write( $type => @args );
$ok = 1;
}
catch {
AE::log warn => $_;
};
$ok;
}
sub writeln {
my ( $self, @lines ) = @_;
$self->write( $_ . $self->_eol ) for @lines;
$self;
}
sub pipe {
my $self = shift;
my $peer = pop;
my $what = ( pop || 'out' );
if ( ref $what ) {
$what = "$what";
}
else {
$what = lc $what;
$what =~ s{^std}{};
}
use Scalar::Util qw(blessed);
my $sub;
if ( blessed $peer) {
if ( $peer->isa(__PACKAGE__) ) {
$sub = sub {
$peer->write(shift);
}
}
elsif ( $peer->isa('AnyEvent::Handle') ) {
$sub = sub {
$peer->push_write(shift);
}
}
elsif ( $peer->isa('Coro::Channel') ) {
$sub = sub {
$peer->put(shift);
}
}
elsif ( $peer->can('print') ) {
$sub = sub {
$peer->print(shift);
}
}
}
elsif ( ref $peer eq 'SCALAR' ) {
$sub = sub {
$$peer .= shift;
}
}
elsif ( ref $peer eq 'GLOB' ) {
$sub = sub {
print $peer shift();
}
}
elsif ( ref $peer eq 'CODE' ) {
$sub = $peer;
}
if ($sub) {
AE::log debug => "pipe $peer from $what";
my $aeh = $self->_geth($what);
$aeh->on_eof(
sub {
AE::log debug => "eof: $what";
shift->destroy;
$self->{waiter}->end;
}
lib/AnyEvent/Proc.pm view on Meta::CPAN
elsif ( $peer->isa('IO::Handle') ) {
return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
}
elsif ( $peer->isa('Coro::Channel') ) {
if ( my $class = load_class('Coro') ) {
return $class->new(
sub {
while ( my $x = $peer->get ) {
$self->write($x) or last;
Coro::cede();
}
$self->finish;
}
);
}
}
}
elsif ( ref $peer eq 'SCALAR' ) {
return _read_on_scalar(
$peer,
sub {
AE::log debug => "pull($peer)->STORE";
$self->write( shift() );
}
);
}
elsif ( ref $peer eq 'GLOB' ) {
return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
}
AE::log fatal => "cannot pull $peer to stdin";
}
sub _push_read {
my ( $self, $what, @args ) = @_;
my $ok = 0;
try {
$self->_geth($what)->push_read(@args);
$ok = 1;
}
catch {
AE::log note => "cannot push_read from std$what: $_";
};
$ok;
}
sub _unshift_read {
my ( $self, $what, @args ) = @_;
my $ok = 0;
try {
$self->_geth($what)->unshift_read(@args);
$ok = 1;
}
catch {
AE::log note => "cannot unshift_read from std$what: $_";
};
$ok;
}
sub _readline {
my ( $self, $what, $sub ) = @_;
$self->_push_read( $what => line => $self->_reol, $sub );
}
sub _readchunk {
my ( $self, $what, $bytes, $sub ) = @_;
$self->_push_read( $what => chunk => $bytes => $sub );
}
sub _sub_cb {
my ($cb) = @_;
sub { $cb->( $_[1] ) }
}
sub _sub_cv {
my ($cv) = @_;
sub { $cv->send( $_[1] ) }
}
sub _sub_ch {
my ($ch) = @_;
sub { $ch->put( $_[1] ) }
}
sub _readline_cb {
my ( $self, $what, $cb ) = @_;
$self->_push_waiter( $what => $cb );
$self->_readline( $what => _sub_cb($cb) );
}
sub _readline_cv {
my ( $self, $what, $cv ) = @_;
$cv ||= AE::cv;
$self->_push_waiter( $what => $cv );
$cv->send unless $self->_readline( $what => _sub_cv($cv) );
$cv;
}
sub _readline_ch {
my ( $self, $what, $channel ) = @_;
unless ($channel) {
if ( my $class = load_class('Coro::Channel') ) {
$channel ||= $class->new;
}
}
$self->_push_waiter( $what => $channel );
$channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
$channel;
}
sub _readlines_cb {
my ( $self, $what, $cb ) = @_;
$self->_push_waiter( $what => $cb );
$self->_geth($what)->on_read(
sub {
$self->_readline( $what => _sub_cb($cb) );
}
);
}
sub _readlines_ch {
my ( $self, $what, $channel ) = @_;
lib/AnyEvent/Proc.pm view on Meta::CPAN
}
sub readline_ch {
my ( $self, $ch ) = @_;
$self->{proc}->_readline_ch( $self => $ch );
}
sub readlines_cb {
my ( $self, $cb ) = @_;
$self->{proc}->_readlines_cb( $self => $cb );
}
sub readlines_ch {
my ( $self, $ch ) = @_;
$self->{proc}->_readlines_cb( $self => $ch );
}
sub readline {
shift->readline_cv->recv;
}
1;
package # hidden
AnyEvent::Proc::W;
use overload '""' => sub { shift->{fileno} };
use Try::Tiny;
sub A { shift->{w} }
sub B { shift->{r} }
sub finish {
shift->A->destroy;
}
sub on_timeout {
shift->A->on_rtimeout(pop);
}
sub stop_timeout {
shift->A->stop_rtimeout;
}
sub write {
my ( $self, $type, @args ) = @_;
my $ok = 0;
try {
$self->A->push_write( $type => @args );
$ok = 1;
}
catch {
AE::log note => $_;
};
$ok;
}
sub writeln {
my ( $self, @lines ) = @_;
my $eol = $self->{proc}->_eol;
$self->write( $_ . $eol ) for @lines;
$self;
}
sub pull { die 'UNIMPLEMENTED' }
1;
package # hidden
AnyEvent::Proc::TiedScalar;
use Tie::Scalar;
our @ISA = ('Tie::Scalar');
sub TIESCALAR {
bless pop, shift;
}
sub FETCH {
undef;
}
sub STORE {
shift->(pop);
}
1;
__END__
=pod
=head1 NAME
AnyEvent::Proc - Run external commands
=head1 VERSION
version 0.105
=head1 SYNOPSIS
my $proc = AnyEvent::Proc->new(bin => 'cat');
$proc->writeln('hello');
my $hello = $proc->readline;
$proc->fire;
$proc->wait;
=head1 DESCRIPTION
AnyEvent::Proc is a L<AnyEvent>-based helper class for running external commands with full control over STDIN, STDOUT and STDERR.
=head1 METHODS
=head2 new(%options)
=over 4
=item * I<bin> (mandatory)
( run in 1.784 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )