AnyEvent-Process
view release on metacpan or search on metacpan
lib/AnyEvent/Process.pm view on Meta::CPAN
package AnyEvent::Process::Job;
use strict;
sub new {
my ($ref, $pid) = @_;
my $self = bless {pid => $pid, cbs => [], handles => [], timers => []}, $ref;
return $self;
}
sub kill {
my ($self, $signal) = @_;
return kill $signal, $self->{pid};
}
sub pid {
return $_[0]->{pid};
}
sub _add_cb {
my ($self, $cb) = @_;
push @{$self->{cbs}}, $cb;
}
sub _add_handle {
my ($self, $handle) = @_;
push @{$self->{handles}}, $handle;
}
sub _add_timer {
my ($self, $timer) = @_;
push @{$self->{timers}}, $timer;
}
sub _remove_cbs {
undef $_[0]->{cbs};
}
sub _remove_timers {
my $self = shift;
undef $_ foreach @{$self->{timers}};
undef $self->{timers};
}
sub close {
my $self = shift;
undef $_ foreach @{$self->{handles}};
undef $self->{handles};
}
package AnyEvent::Process;
use strict;
use AnyEvent::Handle;
use AnyEvent::Util;
use AnyEvent;
use Carp;
our @proc_args = qw(fh_table code on_completion args watchdog_interval on_watchdog kill_interval on_kill close_all_fds_except);
our $VERSION = '0.02';
my $nop = sub {};
sub _yield {
my $cv_yield = AE::cv;
AE::postpone { $cv_yield->send };
$cv_yield->recv;
}
# Create a callback factory. This is needed to execute on_completion after all
# other callbacks.
sub _create_callback_factory {
my $on_completion = shift // $nop;
my $counter = 0;
my @on_completion_args;
my $factory = sub {
my $func = shift // $nop;
$counter++;
return sub {
my ($err, $rtn);
eval {
$rtn = $func->(@_);
}; $err = $@;
if (--$counter == 0) {
eval {
$on_completion->(@on_completion_args);
}; $err = $err || $@;
$on_completion_args[0]->_remove_cbs;
}
if ($err) {
croak $err;
}
return $rtn;
}
};
my $set_on_completion_args = sub {
lib/AnyEvent/Process.pm view on Meta::CPAN
open $dup->[0], '+>&', $dup->[1];
close $dup->[1];
}
# Close handles
foreach my $dup (@handles) {
close $dup->[0];
}
# Close other filedescriptors
if (defined $proc_args{close_all_fds_except}) {
my @not_close = map fileno($_), @{$proc_args{close_all_fds_except}};
AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
push @not_close, fileno $_->[0] foreach @fh_table;
AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
AnyEvent::Util::close_all_fds_except @not_close;
}
# Run the code
my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
exit ($rtn eq int($rtn) ? $rtn : 1);
} else {
AE::log info => "Forked new process $pid.";
$job = new AnyEvent::Process::Job($pid);
# Close FDs
foreach my $dup (@fh_table) {
AE::log trace => "Closing $dup->[1].";
close $dup->[1];
}
# Create handles
foreach my $handle (@handles) {
my (@hdl_args, @hdl_calls);
for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
if ($handle->[1][$i] eq 'on_eof') {
push @hdl_calls, [$handle->[1][$i], $callback_factory->($handle->[1][$i+1][0])];
} else {
push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
}
} else {
push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
}
}
AE::log trace => "Creating handle " . join ' ', @hdl_args;
my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
foreach my $call (@hdl_calls) {
no strict 'refs';
my $method = $call->[0];
AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
$hdl->$method(@{$call->[1]});
}
$job->_add_handle($hdl);
}
# Create callbacks
my $completion_cb = sub {
$job->_remove_timers();
AE::log info => "Process $job->{pid} finished with code $_[1].";
$set_on_completion_args->($job, $_[1]);
};
$job->_add_cb(AE::child $pid, $callback_factory->($completion_cb));
$self->{job} = $job;
# Create watchdog and kill timers
my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
if (defined $proc_args{kill_interval}) {
my $kill_cb = sub {
$job->_remove_timers();
AE::log warn => "Process $job->{pid} is running too long, killing it.";
$on_kill->($job);
};
$job->_add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);
}
if (defined $proc_args{watchdog_interval} or defined $proc_args{on_watchdog}) {
unless (defined $proc_args{watchdog_interval} &&
defined $proc_args{on_watchdog}) {
croak "Both or none of watchdog_interval and on_watchdog must be defined";
}
my $watchdog_cb = sub {
AE::log info => "Executing watchdog for process $job->{pid}.";
unless ($proc_args{on_watchdog}->($job)) {
$job->_remove_timers();
AE::log warn => "Watchdog for process $job->{pid} failed, killing it.";
$on_kill->($job);
}
};
$job->_add_timer(AE::timer $proc_args{watchdog_interval}, $proc_args{watchdog_interval}, $watchdog_cb);
}
}
# We need this to allow AE collecting pending signals and prevent accumulation of zombies
$self->_yield;
return $job;
}
sub kill {
my ($self, $signal) = @_;
croak 'No process was started' unless defined $self->{job};
return $self->{job}->kill($signal // 15);
}
sub pid {
my $self = shift;
croak 'No process was started' unless defined $self->{job};
return $self->{job}->pid();
}
sub close {
my $self = shift;
croak 'No process was started' unless defined $self->{job};
return $self->{job}->close();
}
1;
__END__
=head1 NAME
AnyEvent::Process - Start a process and watch for events
=head1 SYNOPSIS
use AnyEvent::Process;
my $proc = new AnyEvent::Process(
fh_table => [
# Connect OUTPIPE file handle to STDIN of a new process
\*STDIN => ['pipe', '<', \*OUTPIPE],
# Connect INPIPE file handle to STDOUT of a new process
\*STDOUT => ['pipe', '>', \*INPIPE],
# Print everything written to STDERR by a new process to STDERR of current
# process, but prefix every line with 'bc ERROR: '
\*STDERR => ['decorate', '>', 'bc ERROR: ', \*STDERR]
],
# We don't want to wait longer than 10 seconds, so kill bc after that time
kill_interval => 10,
# Execute bc in a new process
code => sub {
exec 'bc', '-q';
});
# Start bc in a new process
( run in 0.975 second using v1.01-cache-2.11-cpan-02777c243ea )