AnyEvent-Proc

 view release on metacpan or  search on metacpan

lib/AnyEvent/Proc.pm  view on Meta::CPAN

        exec {$bin} @$cmd;

        POSIX::_exit(126);
    }

    $$pidref = $pid;

    my $w;
    $w = AE::child $pid => sub {
        my $status   = $_[1] >> 8;
        my $signal   = $_[1] & 127;
        my $coredump = $_[1] & 128;
        AE::log info  => "child exited with status $status" if $status;
        AE::log debug => "child exited with signal $signal" if $signal;
        AE::log note  => "child exited with coredump"       if $coredump;
        undef $w;
        map { close $_ } values %redir;
        $cv->send($status);
    };

    $cv;
}

sub new {
    my ( $class, %options ) = @_;

    $options{args} ||= [];

    my ( $rIN,  $wIN )  = _rpipe;
    my ( $rOUT, $wOUT ) = _wpipe;
    my ( $rERR, $wERR ) = _wpipe;

    my @xhs = @{ delete( $options{extras} ) || [] };

    my @args = map { "$_" } @{ delete $options{args} };

    my $pid;

    my %redir = (
        0 => $rIN,
        1 => $wOUT,
        2 => $wERR,
        map { ( "$_" => $_->B ) } @xhs
    );

    my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
    my $waiter = AE::cv;

    my $self = bless {
        handles => {
            in  => $wIN,
            out => $rOUT,
            err => $rERR,
            map { ( "$_" => $_->A ) } @xhs,
        },
        pid       => $pid,
        listeners => {
            exit       => delete $options{on_exit},
            ttl_exceed => delete $options{on_ttl_exceed},
        },
        eol     => "\n",
        cv      => $cv,
        alive   => 1,
        waiter  => $waiter,
        waiters => {
            in  => [],
            out => [],
            err => [],
            map { ( "$_" => [] ) } @xhs
        },
        reapers => [],
      } => ref $class
      || $class;

    map { $_->{proc} = $self } @xhs;

    {
        my $eol = quotemeta $self->_eol;
        $self->{reol} = delete $options{reol} || qr{$eol};
    }

    if ( $options{ttl} ) {
        $self->{timer} = AnyEvent->timer(
            after => delete $options{ttl},
            cb    => sub {
                return unless $self->alive;
                $self->kill;
                $self->_emit('ttl_exceed');
            }
        );
    }

    my $kill = sub { $self->end };

    if ( $options{timeout} ) {
        $wIN->timeout( $options{timeout} );
        $rOUT->timeout( $options{timeout} );
        $rERR->timeout( $options{timeout} );
        delete $options{timeout};

        $self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
        my $cb = sub { $self->_emit('timeout') };
        $wIN->on_timeout($cb);
        $rOUT->on_timeout($cb);
        $rERR->on_timeout($cb);
    }

    if ( $options{wtimeout} ) {
        $wIN->wtimeout( delete $options{wtimeout} );

        $self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('wtimeout') };
        $wIN->on_wtimeout($cb);
    }

    if ( $options{rtimeout} ) {
        $rOUT->rtimeout( delete $options{rtimeout} );

        $self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('rtimeout') };
        $rOUT->on_rtimeout($cb);
    }

    if ( $options{etimeout} ) {
        $rERR->rtimeout( delete $options{etimeout} );

        $self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('etimeout') };
        $rERR->on_rtimeout($cb);
    }

    if ( $options{errstr} ) {
        my $sref = delete $options{errstr};
        $$sref = '';
        $self->pipe( err => $sref );
    }

    if ( $options{outstr} ) {
        my $sref = delete $options{outstr};

lib/AnyEvent/Proc.pm  view on Meta::CPAN

        fileno => fileno( $w->fh )
      } => __PACKAGE__
      . '::W';
}

sub run {
    my $cv = AE::cv;
    run_cb(
        @_,
        sub {
            $cv->send( \@_ );
        }
    )->recv;
    my ( $out, $err, $status ) = @{ $cv->recv };
    $? = $status << 8;
    if (wantarray) {
        return ( $out, $err );
    }
    else {
        carp $err if $err;
        return $out;
    }
}

sub run_cb {
    my $bin  = shift;
    my $cb   = pop;
    my @args = @_;
    my ( $out, $err ) = ( '', '' );
    my $proc = __PACKAGE__->new(
        bin    => $bin,
        args   => \@args,
        outstr => \$out,
        errstr => \$err
    );
    $proc->finish;
    $proc->wait(
        sub {
            my $status = $proc->{status};
            $? = $status << 8;
            $cb->( $out, $err, $status );
        }
    );
}

sub _on {
    my ( $self, $name, $handler ) = @_;
    $self->{listeners}->{$name} = $handler;
}

sub in { shift->_geth('in') }

sub out { shift->_geth('out') }

sub err { shift->_geth('err') }

sub _geth {
    shift->{handles}->{ pop() };
}

sub _eol  { shift->{eol} }
sub _reol { shift->{reol} }

sub _emit {
    my ( $self, $name, @args ) = @_;
    AE::log debug => "trapped $name";
    if ( exists $self->{listeners}->{$name}
        and defined $self->{listeners}->{$name} )
    {
        $self->{listeners}->{$name}->( $self, @args );
    }
}

sub pid {
    shift->{pid};
}

sub fire {
    my ( $self, $signal ) = @_;
    $signal = 'TERM' unless defined $signal;
    $signal =~ s{^sig}{}i;
    AE::log debug   => "fire SIG$signal";
    kill uc $signal => $self->pid;
}

sub kill {
    my ($self) = @_;
    $self->fire('kill');
}

sub fire_and_kill {
    my $self   = shift;
    my $cb     = ( ref $_[-1] eq 'CODE' ? pop : undef );
    my $time   = pop;
    my $signal = uc( pop || 'TERM' );
    my $w      = AnyEvent->timer(
        after => $time,
        cb    => sub {
            return unless $self->alive;
            $self->kill;
        }
    );
    $self->fire($signal);
    if ($cb) {
        return $self->wait(
            sub {
                undef $w;
                $cb->(@_);
            }
        );
    }
    else {
        my $exit = $self->wait;
        undef $w;
        return $exit;
    }
}

sub alive {
    my $self = shift;
    return 0 unless $self->{alive};
    $self->fire(0) ? 1 : 0;

lib/AnyEvent/Proc.pm  view on Meta::CPAN

    if ($cb) {
        $self->{waiter}->cb($next);
        return $self->{waiter};
    }
    else {
        $self->{waiter}->recv;
        return $next->( $self->{waiter} );
    }
}

sub finish {
    my ($self) = @_;
    $self->in->destroy;
    $self;
}

sub end {
    my ($self) = @_;
    map { $_->destroy } values %{ $self->{handles} };
    map { $_->() } @{ $self->{reapers} };
    $self;
}

sub stop_timeout {
    my ($self) = @_;
    $self->in->timeout(0);
    $self->out->timeout(0);
    $self->err->timeout(0);
}

sub stop_wtimeout {
    my ($self) = @_;
    $self->in->wtimeout(0);
}

sub stop_rtimeout {
    my ($self) = @_;
    $self->out->rtimeout(0);
}

sub stop_etimeout {
    my ($self) = @_;
    $self->err->rtimeout(0);
}

sub write {
    my ( $self, $type, @args ) = @_;
    my $ok = 0;
    try {
        $self->_geth('in')->push_write( $type => @args );
        $ok = 1;
    }
    catch {
        AE::log warn => $_;
    };
    $ok;
}

sub writeln {
    my ( $self, @lines ) = @_;
    $self->write( $_ . $self->_eol ) for @lines;
    $self;
}

sub pipe {
    my $self = shift;
    my $peer = pop;
    my $what = ( pop || 'out' );
    if ( ref $what ) {
        $what = "$what";
    }
    else {
        $what = lc $what;
        $what =~ s{^std}{};
    }
    use Scalar::Util qw(blessed);
    my $sub;
    if ( blessed $peer) {
        if ( $peer->isa(__PACKAGE__) ) {
            $sub = sub {
                $peer->write(shift);
              }
        }
        elsif ( $peer->isa('AnyEvent::Handle') ) {
            $sub = sub {
                $peer->push_write(shift);
              }
        }
        elsif ( $peer->isa('Coro::Channel') ) {
            $sub = sub {
                $peer->put(shift);
              }
        }
        elsif ( $peer->can('print') ) {
            $sub = sub {
                $peer->print(shift);
              }
        }
    }
    elsif ( ref $peer eq 'SCALAR' ) {
        $sub = sub {
            $$peer .= shift;
          }
    }
    elsif ( ref $peer eq 'GLOB' ) {
        $sub = sub {
            print $peer shift();
          }
    }
    elsif ( ref $peer eq 'CODE' ) {
        $sub = $peer;
    }
    if ($sub) {
        AE::log debug => "pipe $peer from $what";
        my $aeh = $self->_geth($what);
        $aeh->on_eof(
            sub {
                AE::log debug => "eof: $what";
                shift->destroy;
                $self->{waiter}->end;
            }

lib/AnyEvent/Proc.pm  view on Meta::CPAN

        elsif ( $peer->isa('IO::Handle') ) {
            return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
        }
        elsif ( $peer->isa('Coro::Channel') ) {
            if ( my $class = load_class('Coro') ) {
                return $class->new(
                    sub {
                        while ( my $x = $peer->get ) {
                            $self->write($x) or last;
                            Coro::cede();
                        }
                        $self->finish;
                    }
                );
            }
        }
    }
    elsif ( ref $peer eq 'SCALAR' ) {
        return _read_on_scalar(
            $peer,
            sub {
                AE::log debug => "pull($peer)->STORE";
                $self->write( shift() );
            }
        );
    }
    elsif ( ref $peer eq 'GLOB' ) {
        return $self->pull( AnyEvent::Handle->new( fh => $peer ) );
    }
    AE::log fatal => "cannot pull $peer to stdin";
}

sub _push_read {
    my ( $self, $what, @args ) = @_;
    my $ok = 0;
    try {
        $self->_geth($what)->push_read(@args);
        $ok = 1;
    }
    catch {
        AE::log note => "cannot push_read from std$what: $_";
    };
    $ok;
}

sub _unshift_read {
    my ( $self, $what, @args ) = @_;
    my $ok = 0;
    try {
        $self->_geth($what)->unshift_read(@args);
        $ok = 1;
    }
    catch {
        AE::log note => "cannot unshift_read from std$what: $_";
    };
    $ok;
}

sub _readline {
    my ( $self, $what, $sub ) = @_;
    $self->_push_read( $what => line => $self->_reol, $sub );
}

sub _readchunk {
    my ( $self, $what, $bytes, $sub ) = @_;
    $self->_push_read( $what => chunk => $bytes => $sub );
}

sub _sub_cb {
    my ($cb) = @_;
    sub { $cb->( $_[1] ) }
}

sub _sub_cv {
    my ($cv) = @_;
    sub { $cv->send( $_[1] ) }
}

sub _sub_ch {
    my ($ch) = @_;
    sub { $ch->put( $_[1] ) }
}

sub _readline_cb {
    my ( $self, $what, $cb ) = @_;
    $self->_push_waiter( $what => $cb );
    $self->_readline( $what => _sub_cb($cb) );
}

sub _readline_cv {
    my ( $self, $what, $cv ) = @_;
    $cv ||= AE::cv;
    $self->_push_waiter( $what => $cv );
    $cv->send unless $self->_readline( $what => _sub_cv($cv) );
    $cv;
}

sub _readline_ch {
    my ( $self, $what, $channel ) = @_;
    unless ($channel) {
        if ( my $class = load_class('Coro::Channel') ) {
            $channel ||= $class->new;
        }
    }
    $self->_push_waiter( $what => $channel );
    $channel->shutdown unless $self->_readline( $what => _sub_ch($channel) );
    $channel;
}

sub _readlines_cb {
    my ( $self, $what, $cb ) = @_;
    $self->_push_waiter( $what => $cb );
    $self->_geth($what)->on_read(
        sub {
            $self->_readline( $what => _sub_cb($cb) );
        }
    );
}

sub _readlines_ch {
    my ( $self, $what, $channel ) = @_;

lib/AnyEvent/Proc.pm  view on Meta::CPAN

}

sub readline_ch {
    my ( $self, $ch ) = @_;
    $self->{proc}->_readline_ch( $self => $ch );
}

sub readlines_cb {
    my ( $self, $cb ) = @_;
    $self->{proc}->_readlines_cb( $self => $cb );
}

sub readlines_ch {
    my ( $self, $ch ) = @_;
    $self->{proc}->_readlines_cb( $self => $ch );
}

sub readline {
    shift->readline_cv->recv;
}

1;

package    # hidden
  AnyEvent::Proc::W;

use overload '""' => sub { shift->{fileno} };

use Try::Tiny;

sub A { shift->{w} }
sub B { shift->{r} }

sub finish {
    shift->A->destroy;
}

sub on_timeout {
    shift->A->on_rtimeout(pop);
}

sub stop_timeout {
    shift->A->stop_rtimeout;
}

sub write {
    my ( $self, $type, @args ) = @_;
    my $ok = 0;
    try {
        $self->A->push_write( $type => @args );
        $ok = 1;
    }
    catch {
        AE::log note => $_;
    };
    $ok;
}

sub writeln {
    my ( $self, @lines ) = @_;
    my $eol = $self->{proc}->_eol;
    $self->write( $_ . $eol ) for @lines;
    $self;
}

sub pull { die 'UNIMPLEMENTED' }

1;

package    # hidden
  AnyEvent::Proc::TiedScalar;

use Tie::Scalar;

our @ISA = ('Tie::Scalar');

sub TIESCALAR {
    bless pop, shift;
}

sub FETCH {
    undef;
}

sub STORE {
    shift->(pop);
}

1;

__END__

=pod

=head1 NAME

AnyEvent::Proc - Run external commands

=head1 VERSION

version 0.105

=head1 SYNOPSIS

	my $proc = AnyEvent::Proc->new(bin => 'cat');
	$proc->writeln('hello');
	my $hello = $proc->readline;
	$proc->fire;
	$proc->wait;

=head1 DESCRIPTION

AnyEvent::Proc is a L<AnyEvent>-based helper class for running external commands with full control over STDIN, STDOUT and STDERR.

=head1 METHODS

=head2 new(%options)

=over 4

=item * I<bin> (mandatory)



( run in 1.784 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )