Mojo-Run
view release on metacpan or search on metacpan
lib/Mojo/Run.pm view on Meta::CPAN
*portable_pipe = sub () { my ($r, $w);
pipe $r, $w or return;
($r, $w);
};
*portable_socketpair = sub () {
socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
or return;
$fh1->autoflush(1);
$fh2->autoflush(1);
($fh1, $fh2)
};
}
sub new { __PACKAGE__->singleton }
sub singleton {
return $_obj if defined $_obj;
return $_obj = __PACKAGE__->_constructor;
}
sub _constructor {
my $proto = shift;
my $class = ref($proto) || $proto;
my $self = $class->SUPER::new;
bless $self => $class;
# install SIGCHLD handler
$SIG{'CHLD'} = sub { _sig_chld($self, @_) };
return $self;
}
sub log_level {
my ($self, $level) = @_;
$self->log->level($level) if defined $level;
return $self->log->level;
}
sub spawn {
my ($self, %opt) = @_;
unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
my $obj = __PACKAGE__->new;
return $obj->spawn(%opt);
}
$self->error('');
if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
$self->error("Unable to spawn another subprocess: "
."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
);
return 0;
}
# normalize and validate run parameters...
my $proc = $self->_getRunStruct(\%opt);
return 0 unless $self->_validateRunStruct($proc);
$self->log->debug("Spawning command "
."timeout: "
.($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
." : [$proc->{cmd}]"
);
my ($stdout_p, $stdout_c) = portable_socketpair;
my ($stderr_p, $stderr_c) = portable_socketpair;
my ($stdres_p, $stdres_c) = portable_socketpair;
$proc->{time_started} = time;
$proc->{running } = 1;
$proc->{hdr_stdout } = $stdout_c;
$proc->{hdr_stderr } = $stderr_c;
$proc->{hdr_stdres } = $stdres_c;
my $pid = fork;
if ($pid) {
# parent
$self->num_forks($self->num_forks + 1);
$self->log->debug("Subprocess spawned as pid $pid.");
$proc->{pid} = $pid;
# exec timeout
if (defined $proc->{exec_timeout} && $proc->{exec_timeout} > 0) {
$self->log->debug(
"[process $pid]: Setting execution timeout to " .
sprintf("%-.3f seconds.", $proc->{exec_timeout})
);
my $timer = $self->ioloop->timer(
$proc->{exec_timeout},
sub { _timeout_cb($self, $pid) }
);
# save timer
$proc->{id_timeout} = $timer;
}
$self->{_data}->{$pid} = $proc;
close $stdout_p;
close $stderr_p;
close $stdres_p;
$self->watch('stdout', $pid);
$self->watch('stderr', $pid);
$self->watch('stdres', $pid);
} else {
# child
$self->is_child(1);
close $stdout_c;
close $stderr_c;
close $stdres_c;
# Stdio should not be tied.
lib/Mojo/Run.pm view on Meta::CPAN
$self->log->debug("[process $pid]: invoking exit_cb callback.");
eval { $proc->{exit_cb}->($pid, $cb_d); };
$self->log->error("[process $pid]: Error running exit_cb: $@") if $@;
} else {
$self->log->error("[process $pid]: No exit_cb callback!");
}
}
delete $self->{_data}->{$pid};
$self->num_forks($self->num_forks - 1);
}
sub _sig_chld {
my ($self) = @_;
no strict 'subs';
my $i = 0;
while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
$i++;
my $exit_val = $? >> 8;
my $signum = $? & 127;
my $core = $? & 128;
# do process cleanup
$self->cleanup($pid, $exit_val, $signum, $core);
}
$self->log->debug("SIGCHLD handler cleaned up after $i process(es).")
if $i > 0;
}
sub _getRunStruct {
my ($self, $opt) = @_;
my $s = {
pid => 0,
cmd => undef,
param => undef,
error => undef,
stdout_cb => undef,
stderr_cb => undef,
exit_cb => undef,
exec_timeout => 0,
buf_stdout => '',
buf_stderr => '',
buf_stdres => '',
hdr_stdout => undef,
hdr_stderr => undef,
hdr_stdres => undef,
};
# apply user defined vars...
$s->{$_} = $opt->{$_}
for grep { exists $s->{$_} } keys %$opt;
return $s;
}
sub _validateRunStruct {
my ($self, $s) = @_;
# command?
$self->error('Undefined command.') and return
unless defined $s->{cmd};
# check command...
my $cmd_ref = ref $s->{cmd};
$self->error('Zero-length command.') and return
if $cmd_ref eq '' && length $s->{cmd} == 0;
$self->error('Command can be pure scalar, arrayref or coderef.') and return
if $cmd_ref ne '' && not $cmd_ref ~~ ['CODE', 'ARRAY'];
# callbacks...
$self->error("STDOUT callback defined, but is not code reference.") and return
if defined $s->{stdout_cb} && ref $s->{stdout_cb} ne 'CODE';
$self->error("STDERR callback defined, but is not code reference.") and return
if defined $s->{stderr_cb} && ref $s->{stderr_cb} ne 'CODE';
$self->error("Process exit_cb callback defined, but is not code reference.") and return
if defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE';
# exec timeout
{ no warnings; $s->{exec_timeout} += 0; }
return 1;
}
sub _timeout_cb {
my ($self, $pid) = @_;
my $proc = $self->get_proc($pid);
return 0 unless $proc;
# drop timer (can't hurt...)
if (defined $proc->{id_timeout}) {
$self->ioloop->remove($proc->{id_timeout});
$proc->{id_timeout} = undef;
}
# is process still alive?
return 0 unless kill 0, $pid;
$self->log->debug("[process $pid]: Execution timeout ("
.sprintf("%-.3f seconds).", $proc->{exec_timeout})
." Killing process.");
$proc->{stderr} .= ";Execution timeout.";
# kill the motherfucker!
unless (CORE::kill(9, $pid)) {
$self->log->warn("[process $pid]: Unable to kill process: $!");
}
$proc->{hard_kill} = 1;
$self->cleanup($pid, 0, 9, 0);
return 1;
( run in 0.944 second using v1.01-cache-2.11-cpan-df04353d9ac )