AnyEvent-Proc
view release on metacpan or search on metacpan
lib/AnyEvent/Proc.pm view on Meta::CPAN
POSIX::_exit(126);
}
$$pidref = $pid;
my $w;
$w = AE::child $pid => sub {
my $status = $_[1] >> 8;
my $signal = $_[1] & 127;
my $coredump = $_[1] & 128;
AE::log info => "child exited with status $status" if $status;
AE::log debug => "child exited with signal $signal" if $signal;
AE::log note => "child exited with coredump" if $coredump;
undef $w;
map { close $_ } values %redir;
$cv->send($status);
};
$cv;
}
sub new {
my ( $class, %options ) = @_;
$options{args} ||= [];
my ( $rIN, $wIN ) = _rpipe;
my ( $rOUT, $wOUT ) = _wpipe;
my ( $rERR, $wERR ) = _wpipe;
my @xhs = @{ delete( $options{extras} ) || [] };
my @args = map { "$_" } @{ delete $options{args} };
my $pid;
my %redir = (
0 => $rIN,
1 => $wOUT,
2 => $wERR,
map { ( "$_" => $_->B ) } @xhs
);
my $cv = _run_cmd( [ delete $options{bin} => @args ], \%redir, \$pid );
my $waiter = AE::cv;
my $self = bless {
handles => {
in => $wIN,
out => $rOUT,
err => $rERR,
map { ( "$_" => $_->A ) } @xhs,
},
pid => $pid,
listeners => {
exit => delete $options{on_exit},
ttl_exceed => delete $options{on_ttl_exceed},
},
eol => "\n",
cv => $cv,
alive => 1,
waiter => $waiter,
waiters => {
in => [],
out => [],
err => [],
map { ( "$_" => [] ) } @xhs
},
reapers => [],
} => ref $class
|| $class;
map { $_->{proc} = $self } @xhs;
{
my $eol = quotemeta $self->_eol;
$self->{reol} = delete $options{reol} || qr{$eol};
}
if ( $options{ttl} ) {
$self->{timer} = AnyEvent->timer(
after => delete $options{ttl},
cb => sub {
return unless $self->alive;
$self->kill;
$self->_emit('ttl_exceed');
}
);
}
my $kill = sub { $self->end };
if ( $options{timeout} ) {
$wIN->timeout( $options{timeout} );
$rOUT->timeout( $options{timeout} );
$rERR->timeout( $options{timeout} );
delete $options{timeout};
$self->_on( timeout => ( delete( $options{on_timeout} ) || $kill ) );
my $cb = sub { $self->_emit('timeout') };
$wIN->on_timeout($cb);
$rOUT->on_timeout($cb);
$rERR->on_timeout($cb);
}
if ( $options{wtimeout} ) {
$wIN->wtimeout( delete $options{wtimeout} );
$self->_on( wtimeout => ( delete( $options{on_wtimeout} ) || $kill ) );
my $cb = sub { $self->_emit('wtimeout') };
$wIN->on_wtimeout($cb);
}
if ( $options{rtimeout} ) {
$rOUT->rtimeout( delete $options{rtimeout} );
$self->_on( rtimeout => ( delete( $options{on_rtimeout} ) || $kill ) );
my $cb = sub { $self->_emit('rtimeout') };
$rOUT->on_rtimeout($cb);
}
if ( $options{etimeout} ) {
$rERR->rtimeout( delete $options{etimeout} );
$self->_on( etimeout => ( delete( $options{on_etimeout} ) || $kill ) );
my $cb = sub { $self->_emit('etimeout') };
$rERR->on_rtimeout($cb);
}
if ( $options{errstr} ) {
my $sref = delete $options{errstr};
$$sref = '';
$self->pipe( err => $sref );
}
if ( $options{outstr} ) {
my $sref = delete $options{outstr};
$$sref = '';
$self->pipe( out => $sref );
}
$waiter->begin;
$cv->cb(
sub {
$self->{status} = shift->recv;
$self->{alive} = 0;
undef $self->{timer};
$waiter->end;
$self->_emit( exit => $self->{status} );
}
);
if ( keys %options ) {
AE::log note => "unknown left-over option(s): " . join ', ' =>
keys %options;
}
$self;
}
sub reader {
my ( $r, $w ) = _wpipe;
bless {
r => $r,
w => $w,
fileno => fileno( $r->fh )
} => __PACKAGE__
. '::R';
}
sub writer {
my ( $r, $w ) = _rpipe;
bless {
r => $r,
w => $w,
fileno => fileno( $w->fh )
} => __PACKAGE__
. '::W';
}
sub run {
my $cv = AE::cv;
run_cb(
@_,
sub {
$cv->send( \@_ );
}
)->recv;
my ( $out, $err, $status ) = @{ $cv->recv };
$? = $status << 8;
if (wantarray) {
return ( $out, $err );
}
else {
carp $err if $err;
return $out;
}
}
sub run_cb {
my $bin = shift;
my $cb = pop;
my @args = @_;
my ( $out, $err ) = ( '', '' );
my $proc = __PACKAGE__->new(
bin => $bin,
lib/AnyEvent/Proc.pm view on Meta::CPAN
my $status = $proc->{status};
$? = $status << 8;
$cb->( $out, $err, $status );
}
);
}
sub _on {
my ( $self, $name, $handler ) = @_;
$self->{listeners}->{$name} = $handler;
}
sub in { shift->_geth('in') }
sub out { shift->_geth('out') }
sub err { shift->_geth('err') }
sub _geth {
shift->{handles}->{ pop() };
}
sub _eol { shift->{eol} }
sub _reol { shift->{reol} }
sub _emit {
my ( $self, $name, @args ) = @_;
AE::log debug => "trapped $name";
if ( exists $self->{listeners}->{$name}
and defined $self->{listeners}->{$name} )
{
$self->{listeners}->{$name}->( $self, @args );
}
}
sub pid {
shift->{pid};
}
sub fire {
my ( $self, $signal ) = @_;
$signal = 'TERM' unless defined $signal;
$signal =~ s{^sig}{}i;
AE::log debug => "fire SIG$signal";
kill uc $signal => $self->pid;
}
sub kill {
my ($self) = @_;
$self->fire('kill');
}
sub fire_and_kill {
my $self = shift;
my $cb = ( ref $_[-1] eq 'CODE' ? pop : undef );
my $time = pop;
my $signal = uc( pop || 'TERM' );
my $w = AnyEvent->timer(
after => $time,
cb => sub {
return unless $self->alive;
$self->kill;
}
);
$self->fire($signal);
if ($cb) {
return $self->wait(
sub {
undef $w;
$cb->(@_);
}
);
}
else {
my $exit = $self->wait;
undef $w;
return $exit;
}
}
sub alive {
my $self = shift;
return 0 unless $self->{alive};
$self->fire(0) ? 1 : 0;
}
sub wait {
my ( $self, $cb ) = @_;
my $next = sub {
my $cv = shift;
$cv->recv;
waitpid $self->{pid} => 0;
$cb->( $self->{status} ) if ref $cb eq 'CODE';
$self->end;
$self->{status};
};
AE::log debug => "waiting for "
. ( $self->{waiter}->{_ae_counter} ) . " ends";
if ($cb) {
$self->{waiter}->cb($next);
return $self->{waiter};
}
else {
$self->{waiter}->recv;
return $next->( $self->{waiter} );
}
}
sub finish {
my ($self) = @_;
$self->in->destroy;
$self;
}
sub end {
my ($self) = @_;
map { $_->destroy } values %{ $self->{handles} };
map { $_->() } @{ $self->{reapers} };
$self;
}
sub stop_timeout {
my ($self) = @_;
$self->in->timeout(0);
$self->out->timeout(0);
$self->err->timeout(0);
}
sub stop_wtimeout {
my ($self) = @_;
$self->in->wtimeout(0);
}
sub stop_rtimeout {
my ($self) = @_;
$self->out->rtimeout(0);
}
sub stop_etimeout {
my ($self) = @_;
$self->err->rtimeout(0);
}
lib/AnyEvent/Proc.pm view on Meta::CPAN
=item * I<on_ttl_exceed>
Callback handler called when I<ttl> exceeds
=item * I<on_timeout>
Callback handler called when any inactivity I<timeout> value exceeds
=item * I<on_wtimeout>
Callback handler called when STDIN write inactivity I<wtimeout> value exceeds
=item * I<on_rtimeout>
Callback handler called when STDOUT read inactivity I<rtimeout> value exceeds
=item * I<on_etimeout>
Callback handler called when STDERR read inactivity I<etimeout> value exceeds
=back
=head2 in()
Returns a L<AnyEvent::Handle> for STDIN
Useful for piping data into us:
$socket->print($proc->in->fh)
=head2 out()
Returns a L<AnyEvent::Handle> for STDOUT
=head2 err()
Returns a L<AnyEvent::Handle> for STDERR
=head2 pid()
Returns the PID of the subprocess
=head2 fire([$signal])
Sends a named signal to the subprocess. C<$signal> defaults to I<TERM> if omitted.
=head2 kill()
Kills the subprocess the most brutal way. Equals to
$proc->fire('kill')
=head2 fire_and_kill([$signal, ]$time[, $callback])
Fires specified signal C<$signal> (or I<TERM> if omitted) and after C<$time> seconds kills the subprocess.
See L</wait> for the meaning of the callback parameter and return value.
Without calllback, this is a synchronous call. After this call, the subprocess can be considered to be dead. Returns the exit code of the subprocess.
=head2 alive()
Check whether is subprocess is still alive. Returns I<1> or I<0>
In fact, the method equals to
$proc->fire(0)
=head2 wait([$callback])
Waits for the subprocess to be finished call the callback with the exit code. Returns a condvar.
Without callback, this is a synchronous call directly returning the exit code.
=head2 finish()
Closes STDIN of subprocess
=head2 end()
Closes all handles of subprocess
=head2 stop_timeout()
Stopps read/write timeout for STDIN, STDOUT and STDERR.
See I<timeout> and I<on_timeout> options in I<new()>.
=head2 stop_wtimeout()
Stopps write timeout for STDIN.
See I<wtimeout> and I<on_wtimeout> options in I<new()>.
=head2 stop_rtimeout()
Stopps read timeout for STDIN.
See I<rtimeout> and I<on_rtimeout> options in I<new()>.
=head2 stop_etimeout()
Stopps read timeout for STDIN.
See I<etimeout> and I<on_etimeout> options in I<new()>.
=head2 write($scalar)
Queues the given scalar to be written.
=head2 write($type => @args)
See L<AnyEvent::Handle>::push_write for more information.
=head2 writeln(@lines)
Queues one or more line to be written.
=head2 pipe([$fd, ]$peer)
Pipes any output of STDOUT to another handle. C<$peer> maybe another L<AnyEvent::Proc> instance, an L<AnyEvent::Handle>, a L<Coro::Channel>, an object that implements the I<print> method (like L<IO::Handle>, including any subclass), a ScalarRef or a ...
C<$fd> defaults to I<stdout>.
( run in 1.521 second using v1.01-cache-2.11-cpan-df04353d9ac )