AnyEvent-Process
view release on metacpan or search on metacpan
lib/AnyEvent/Process.pm view on Meta::CPAN
my ($self, $signal) = @_;
return kill $signal, $self->{pid};
}
sub pid {
return $_[0]->{pid};
}
sub _add_cb {
my ($self, $cb) = @_;
push @{$self->{cbs}}, $cb;
}
sub _add_handle {
my ($self, $handle) = @_;
push @{$self->{handles}}, $handle;
}
sub _add_timer {
my ($self, $timer) = @_;
push @{$self->{timers}}, $timer;
}
sub _remove_cbs {
undef $_[0]->{cbs};
}
sub _remove_timers {
my $self = shift;
undef $_ foreach @{$self->{timers}};
undef $self->{timers};
}
sub close {
my $self = shift;
undef $_ foreach @{$self->{handles}};
undef $self->{handles};
}
package AnyEvent::Process;
use strict;
use AnyEvent::Handle;
use AnyEvent::Util;
use AnyEvent;
use Carp;
our @proc_args = qw(fh_table code on_completion args watchdog_interval on_watchdog kill_interval on_kill close_all_fds_except);
our $VERSION = '0.02';
my $nop = sub {};
sub _yield {
my $cv_yield = AE::cv;
AE::postpone { $cv_yield->send };
$cv_yield->recv;
}
# Create a callback factory. This is needed to execute on_completion after all
# other callbacks.
sub _create_callback_factory {
my $on_completion = shift // $nop;
my $counter = 0;
my @on_completion_args;
my $factory = sub {
my $func = shift // $nop;
$counter++;
return sub {
my ($err, $rtn);
eval {
$rtn = $func->(@_);
}; $err = $@;
if (--$counter == 0) {
eval {
$on_completion->(@on_completion_args);
}; $err = $err || $@;
$on_completion_args[0]->_remove_cbs;
}
if ($err) {
croak $err;
}
return $rtn;
}
};
my $set_on_completion_args = sub {
@on_completion_args = @_;
};
return $factory, $set_on_completion_args;
}
sub new {
my $ref = shift;
my $self = bless {}, $ref;
my %args = @_;
foreach my $arg (@proc_args) {
$self->{$arg} = delete $args{$arg} if defined $args{$arg};
}
if (%args) {
croak 'Unknown arguments: ' . join ', ', keys %args;
}
return $self;
}
sub run {
my $self = shift;
my %args = @_;
my %proc_args;
my @fh_table;
my @handles;
lib/AnyEvent/Process.pm view on Meta::CPAN
# Duplicate FDs
foreach my $dup (@fh_table) {
open $dup->[0], '+>&', $dup->[1];
close $dup->[1];
}
# Close handles
foreach my $dup (@handles) {
close $dup->[0];
}
# Close other filedescriptors
if (defined $proc_args{close_all_fds_except}) {
my @not_close = map fileno($_), @{$proc_args{close_all_fds_except}};
AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
push @not_close, fileno $_->[0] foreach @fh_table;
AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
AnyEvent::Util::close_all_fds_except @not_close;
}
# Run the code
my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
exit ($rtn eq int($rtn) ? $rtn : 1);
} else {
AE::log info => "Forked new process $pid.";
$job = new AnyEvent::Process::Job($pid);
# Close FDs
foreach my $dup (@fh_table) {
AE::log trace => "Closing $dup->[1].";
close $dup->[1];
}
# Create handles
foreach my $handle (@handles) {
my (@hdl_args, @hdl_calls);
for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
if ($handle->[1][$i] eq 'on_eof') {
push @hdl_calls, [$handle->[1][$i], $callback_factory->($handle->[1][$i+1][0])];
} else {
push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
}
} else {
push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
}
}
AE::log trace => "Creating handle " . join ' ', @hdl_args;
my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
foreach my $call (@hdl_calls) {
no strict 'refs';
my $method = $call->[0];
AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
$hdl->$method(@{$call->[1]});
}
$job->_add_handle($hdl);
}
# Create callbacks
my $completion_cb = sub {
$job->_remove_timers();
AE::log info => "Process $job->{pid} finished with code $_[1].";
$set_on_completion_args->($job, $_[1]);
};
$job->_add_cb(AE::child $pid, $callback_factory->($completion_cb));
$self->{job} = $job;
# Create watchdog and kill timers
my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
if (defined $proc_args{kill_interval}) {
my $kill_cb = sub {
$job->_remove_timers();
AE::log warn => "Process $job->{pid} is running too long, killing it.";
$on_kill->($job);
};
$job->_add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);
}
if (defined $proc_args{watchdog_interval} or defined $proc_args{on_watchdog}) {
unless (defined $proc_args{watchdog_interval} &&
defined $proc_args{on_watchdog}) {
croak "Both or none of watchdog_interval and on_watchdog must be defined";
}
my $watchdog_cb = sub {
AE::log info => "Executing watchdog for process $job->{pid}.";
unless ($proc_args{on_watchdog}->($job)) {
$job->_remove_timers();
AE::log warn => "Watchdog for process $job->{pid} failed, killing it.";
$on_kill->($job);
}
};
$job->_add_timer(AE::timer $proc_args{watchdog_interval}, $proc_args{watchdog_interval}, $watchdog_cb);
}
}
# We need this to allow AE collecting pending signals and prevent accumulation of zombies
$self->_yield;
return $job;
}
sub kill {
my ($self, $signal) = @_;
croak 'No process was started' unless defined $self->{job};
return $self->{job}->kill($signal // 15);
}
sub pid {
my $self = shift;
croak 'No process was started' unless defined $self->{job};
return $self->{job}->pid();
}
sub close {
my $self = shift;
lib/AnyEvent/Process.pm view on Meta::CPAN
in the read-write mode.
=item TYPE
Following types are supported:
=over 4
=item pipe
Opens an unidirectional pipe or a bidirectional socket (depends on the DIRECTION)
between the current and the new process. ARGS can be a glob reference, then the
second end of the pipe or socket pair is connected to it, or C<handle =E<gt>
[handle_args...]>, where handle_args are an argument passed to the
L<AnyEvent::Handle|AnyEvent::Handle> constructor, which will be connected to the
second end of the pipe or socket. In the a case handle_args is in the form of
C<method =E<gt> [method_args...]> and method is AnyEvent::Handle method, then
this method is called with method_args, after the handle is instantiated.
Example:
\*STDOUT => ['pipe', '>', handle => [push_read => [line => \&reader]]]
=item open
Opens the specified HANDLE using open with DIRECTION and ARGS as its arguments.
Example:
0 => ['open', '<', '/dev/null']
=item decorate
Decorate every line written to the HANDLE by the child. The DIRECTION must be
C<E<gt>>. ARGS are in the form C<DECORATOR, OUTPUT>. OUTPUT is a glob reference
and specifies a file handle, into which decorated lines are written. Decorator is
a string or a code reference. If the decorator is a string, it is prepended to
every line written by the started process. If the DECORATOR is a code reference,
it is called for each line written to the HANDLE with that line as its argument
and its return value is written to the OUTPUT.
Example:
\*STDERR => ['decorate', '>', 'Child STDERR: ', \*STDERR]
=back
=back
=item code (optional, but must be specified either in new or run)
A code reference, which is executed in the newly started process.
=item args (optional)
Arguments past to a code reference specified as code argument when it is called.
=item on_completion (optional)
Callback, which is executed when the process finishes. It receives
AnyEvent::Process::Job instance as the first argument and exit code as the
second argument.
It is called after all AnyEvent::Handle callbacks specified in the fh_table.
=item watchdog_interval (in seconds, optional)
How often a watchdog shall be called. If undefined or set to 0, the watchdog
functionality is disabled.
=item on_watchdog (optional)
Watchdog callback, receives AnyEvent::Process::Job instance as its argument.
If it returns false value, the watched process is killed (see on_kill).
=item kill_interval (in seconds, optional)
Maximum time the process can run. After this time expires, the process is
killed.
=item on_kill (optional, sends SIGKILL by default)
Called, when the process shall be killed. Receives AnyEvent::Process::Job
instance as its argument.
=back
=head2 run
Run a process. Any argument specified to the constructor can be overridden here.
Returns AnyEvent::Process::Job, which represents the new process, or undef on an
error.
=over 4
=item Returned AnyEvent::Process::Job instance has following methods:
=over 4
=item pid
Returns PID of the process.
=item kill
Send signal specified as the argument to the process.
=item close
Close all pipes and socketpairs between this process and the child.
=back
=back
=head2 kill
Run the kill method of the latest created AnyEvent::Process::Job - sends signal
specified as its argument to the process.
=head2 pid
Run the pid method of the latest created AnyEvent::Process::Job - returns PID of
the process.
( run in 2.266 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )