AnyEvent-Process
view release on metacpan or search on metacpan
lib/AnyEvent/Process.pm view on Meta::CPAN
open my $fh, $args->[1], $args->[2];
unless (defined $fh) {
croak "Opening file failed: $!";
}
push @fh_table, [$handle, $fh];
} elsif ($args->[0] eq 'decorate') {
my $out = $args->[3];
unless (defined $out or ref $out eq 'GLOB') {
croak "Third argument of decorate must be a glob reference";
}
my ($my_fh, $child_fh) = portable_pipe;
unless (defined $my_fh && defined $child_fh) {
croak "Creating pipe failed: $!";
}
my $on_read;
my $decorator = $args->[2];
if (defined $decorator and ref $decorator eq '') {
$on_read = sub {
while ($_[0]->rbuf() =~ s/^(.*\n)//) {
print $out $decorator, $1;
}
};
} elsif (defined $decorator and ref $decorator eq 'CODE') {
$on_read = sub {
while ($_[0]->rbuf() =~ s/^(.*\n)//) {
print $out $decorator->($1);
}
};
} else {
croak "Second argument of decorate must be a string or code reference";
}
push @fh_table, [$handle, $child_fh];
push @handles, [$my_fh, [on_read => $on_read, on_eof => $callback_factory->()]];
} else {
croak "Unknown redirect type '$args->[0]'";
}
}
# Start child
my $pid = fork;
my $job;
unless (defined $pid) {
croak "Fork failed: $!";
} elsif ($pid == 0) {
# Duplicate FDs
foreach my $dup (@fh_table) {
open $dup->[0], '+>&', $dup->[1];
close $dup->[1];
}
# Close handles
foreach my $dup (@handles) {
close $dup->[0];
}
# Close other filedescriptors
if (defined $proc_args{close_all_fds_except}) {
my @not_close = map fileno($_), @{$proc_args{close_all_fds_except}};
AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
push @not_close, fileno $_->[0] foreach @fh_table;
AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
AnyEvent::Util::close_all_fds_except @not_close;
}
# Run the code
my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
exit ($rtn eq int($rtn) ? $rtn : 1);
} else {
AE::log info => "Forked new process $pid.";
$job = new AnyEvent::Process::Job($pid);
# Close FDs
foreach my $dup (@fh_table) {
AE::log trace => "Closing $dup->[1].";
close $dup->[1];
}
# Create handles
foreach my $handle (@handles) {
my (@hdl_args, @hdl_calls);
for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
if ($handle->[1][$i] eq 'on_eof') {
push @hdl_calls, [$handle->[1][$i], $callback_factory->($handle->[1][$i+1][0])];
} else {
push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
}
} else {
push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
}
}
AE::log trace => "Creating handle " . join ' ', @hdl_args;
my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
foreach my $call (@hdl_calls) {
no strict 'refs';
my $method = $call->[0];
AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
$hdl->$method(@{$call->[1]});
}
$job->_add_handle($hdl);
}
# Create callbacks
my $completion_cb = sub {
$job->_remove_timers();
AE::log info => "Process $job->{pid} finished with code $_[1].";
$set_on_completion_args->($job, $_[1]);
};
$job->_add_cb(AE::child $pid, $callback_factory->($completion_cb));
$self->{job} = $job;
# Create watchdog and kill timers
my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
if (defined $proc_args{kill_interval}) {
my $kill_cb = sub {
$job->_remove_timers();
AE::log warn => "Process $job->{pid} is running too long, killing it.";
$on_kill->($job);
};
$job->_add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);
( run in 0.595 second using v1.01-cache-2.11-cpan-39bf76dae61 )