AnyEvent-Proc

 view release on metacpan or  search on metacpan

lib/AnyEvent/Proc.pm  view on Meta::CPAN


sub new {
    my ( $class, %options ) = @_;

    $options{args} ||= [];

    my ( $rIN,  $wIN )  = _rpipe;
    my ( $rOUT, $wOUT ) = _wpipe;
    my ( $rERR, $wERR ) = _wpipe;

    my @xhs = @{ delete( $options{extras} ) || [] };

    my @args = map { "$_" } @{ delete $options{args} };

    my $pid;

    my %redir = (
        0 => $rIN,
        1 => $wOUT,
        2 => $wERR,
        map { ( "$_" => $_->B ) } @xhs
    );

    my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
    my $waiter = AE::cv;

    my $self = bless {
        handles => {
            in  => $wIN,
            out => $rOUT,
            err => $rERR,
            map { ( "$_" => $_->A ) } @xhs,
        },
        pid       => $pid,
        listeners => {
            exit       => delete $options{on_exit},
            ttl_exceed => delete $options{on_ttl_exceed},
        },
        eol     => "\n",
        cv      => $cv,
        alive   => 1,
        waiter  => $waiter,
        waiters => {
            in  => [],
            out => [],
            err => [],
            map { ( "$_" => [] ) } @xhs
        },
        reapers => [],
      } => ref $class
      || $class;

    map { $_->{proc} = $self } @xhs;

    {
        my $eol = quotemeta $self->_eol;
        $self->{reol} = delete $options{reol} || qr{$eol};
    }

    if ( $options{ttl} ) {
        $self->{timer} = AnyEvent->timer(
            after => delete $options{ttl},
            cb    => sub {
                return unless $self->alive;
                $self->kill;
                $self->_emit('ttl_exceed');
            }
        );
    }

    my $kill = sub { $self->end };

    if ( $options{timeout} ) {
        $wIN->timeout( $options{timeout} );
        $rOUT->timeout( $options{timeout} );
        $rERR->timeout( $options{timeout} );
        delete $options{timeout};

        $self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
        my $cb = sub { $self->_emit('timeout') };
        $wIN->on_timeout($cb);
        $rOUT->on_timeout($cb);
        $rERR->on_timeout($cb);
    }

    if ( $options{wtimeout} ) {
        $wIN->wtimeout( delete $options{wtimeout} );

        $self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('wtimeout') };
        $wIN->on_wtimeout($cb);
    }

    if ( $options{rtimeout} ) {
        $rOUT->rtimeout( delete $options{rtimeout} );

        $self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('rtimeout') };
        $rOUT->on_rtimeout($cb);
    }

    if ( $options{etimeout} ) {
        $rERR->rtimeout( delete $options{etimeout} );

        $self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
        my $cb = sub { $self->_emit('etimeout') };
        $rERR->on_rtimeout($cb);
    }

    if ( $options{errstr} ) {
        my $sref = delete $options{errstr};
        $$sref = '';
        $self->pipe( err => $sref );
    }

    if ( $options{outstr} ) {
        my $sref = delete $options{outstr};
        $$sref = '';
        $self->pipe( out => $sref );
    }

    $waiter->begin;
    $cv->cb(
        sub {
            $self->{status} = shift->recv;
            $self->{alive}  = 0;
            undef $self->{timer};
            $waiter->end;
            $self->_emit( exit => $self->{status} );
        }
    );

    if ( keys %options ) {
        AE::log note => "unknown left-over option(s): " . join ', ' =>
          keys %options;
    }

    $self;
}

sub reader {
    my ( $r, $w ) = _wpipe;
    bless {
        r      => $r,
        w      => $w,
        fileno => fileno( $r->fh )
      } => __PACKAGE__
      . '::R';
}

sub writer {
    my ( $r, $w ) = _rpipe;
    bless {
        r      => $r,
        w      => $w,
        fileno => fileno( $w->fh )
      } => __PACKAGE__
      . '::W';
}

sub run {
    my $cv = AE::cv;
    run_cb(
        @_,
        sub {
            $cv->send( \@_ );
        }
    )->recv;
    my ( $out, $err, $status ) = @{ $cv->recv };
    $? = $status << 8;
    if (wantarray) {
        return ( $out, $err );
    }
    else {
        carp $err if $err;
        return $out;
    }
}

sub run_cb {
    my $bin  = shift;
    my $cb   = pop;
    my @args = @_;
    my ( $out, $err ) = ( '', '' );
    my $proc = __PACKAGE__->new(
        bin    => $bin,
        args   => \@args,

lib/AnyEvent/Proc.pm  view on Meta::CPAN

    $proc->finish;
    $proc->wait(
        sub {
            my $status = $proc->{status};
            $? = $status << 8;
            $cb->( $out, $err, $status );
        }
    );
}

sub _on {
    my ( $self, $name, $handler ) = @_;
    $self->{listeners}->{$name} = $handler;
}

sub in { shift->_geth('in') }

sub out { shift->_geth('out') }

sub err { shift->_geth('err') }

sub _geth {
    shift->{handles}->{ pop() };
}

sub _eol  { shift->{eol} }
sub _reol { shift->{reol} }

sub _emit {
    my ( $self, $name, @args ) = @_;
    AE::log debug => "trapped $name";
    if ( exists $self->{listeners}->{$name}
        and defined $self->{listeners}->{$name} )
    {
        $self->{listeners}->{$name}->( $self, @args );
    }
}

sub pid {
    shift->{pid};
}

sub fire {
    my ( $self, $signal ) = @_;
    $signal = 'TERM' unless defined $signal;
    $signal =~ s{^sig}{}i;
    AE::log debug   => "fire SIG$signal";
    kill uc $signal => $self->pid;
}

sub kill {
    my ($self) = @_;
    $self->fire('kill');
}

sub fire_and_kill {
    my $self   = shift;
    my $cb     = ( ref $_[-1] eq 'CODE' ? pop : undef );
    my $time   = pop;
    my $signal = uc( pop || 'TERM' );
    my $w      = AnyEvent->timer(
        after => $time,
        cb    => sub {
            return unless $self->alive;
            $self->kill;
        }
    );
    $self->fire($signal);
    if ($cb) {
        return $self->wait(
            sub {
                undef $w;
                $cb->(@_);
            }
        );
    }
    else {
        my $exit = $self->wait;
        undef $w;
        return $exit;
    }
}

sub alive {
    my $self = shift;
    return 0 unless $self->{alive};
    $self->fire(0) ? 1 : 0;
}

sub wait {
    my ( $self, $cb ) = @_;

    my $next = sub {
        my $cv = shift;
        $cv->recv;
        waitpid $self->{pid} => 0;
        $cb->( $self->{status} ) if ref $cb eq 'CODE';
        $self->end;
        $self->{status};
    };
    AE::log debug => "waiting for "
      . ( $self->{waiter}->{_ae_counter} ) . " ends";
    if ($cb) {
        $self->{waiter}->cb($next);
        return $self->{waiter};
    }
    else {
        $self->{waiter}->recv;
        return $next->( $self->{waiter} );
    }
}

sub finish {
    my ($self) = @_;
    $self->in->destroy;
    $self;
}

sub end {
    my ($self) = @_;
    map { $_->destroy } values %{ $self->{handles} };



( run in 0.886 second using v1.01-cache-2.11-cpan-39bf76dae61 )