Mojo-Webqq

 view release on metacpan or  search on metacpan

lib/Mojo/Webqq/Run.pm  view on Meta::CPAN

                pipe $r, $w or return;
                 
                ($r, $w);
        };
        *portable_socketpair = sub () {
                socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
                        or return;
                $fh1->autoflush(1);
                $fh2->autoflush(1);
                 
                ($fh1, $fh2)
        };      
}
 
sub new {my $class = shift; __PACKAGE__->singleton(@_) }
 
sub singleton {
        return $_obj if defined $_obj;
        my $class = shift;
        return $_obj = __PACKAGE__->_constructor(@_);
}
 
sub _constructor {
        my $proto = shift;
        my $class = ref($proto) || $proto;
        my $self  = $class->SUPER::new(@_);
 
        bless $self => $class;
         
        # install SIGCHLD handler
        $SIG{'CHLD'} = sub { _sig_chld($self, @_) };
         
        return $self;
}
 
sub log_level {
        my ($self, $level) = @_;
         
        $self->log->level($level) if defined $level;
         
        return $self->log->level;
}
 
sub spawn {
        my ($self, %opt) = @_;
         
        unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
                my $obj = __PACKAGE__->new;
                return $obj->spawn(%opt);
        }
         
        $self->error('');
         
        if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
                $self->error("Unable to spawn another subprocess: "
                        ."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
                );
                return 0;
        }
         
        # normalize and validate run parameters...
        my $proc = $self->_getRunStruct(\%opt);
        return 0 unless $self->_validateRunStruct($proc);
         
        $self->log->debug("Spawning command "
                ."timeout: "
                .($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
                ." : [$proc->{cmd}]"
        );
        my ($stdout_p, $stdout_c) = portable_socketpair;
        my ($stderr_p, $stderr_c) = portable_socketpair;
        my ($stdres_p, $stdres_c) = portable_socketpair;
         
        $proc->{time_started} = time;
        $proc->{running     } = 1;
        $proc->{hdr_stdout  } = $stdout_c;
        $proc->{hdr_stderr  } = $stderr_c;
        $proc->{hdr_stdres  } = $stdres_c;
         
        my $pid = fork;
         
        if ($pid) {
                # parent
                $self->num_forks($self->num_forks + 1);
                 
                $self->log->debug("Subprocess spawned as pid $pid.");
                 
                $proc->{pid} = $pid;
                 
                # exec timeout
                if (defined $proc->{exec_timeout} && $proc->{exec_timeout} > 0) {
                        $self->log->debug(
                                "[process $pid]: Setting execution timeout to " .
                                sprintf("%-.3f seconds.", $proc->{exec_timeout})
                        );
                        my $timer = $self->ioloop->timer(
                                $proc->{exec_timeout},
                                sub { _timeout_cb($self, $pid) }
                        );
         
                        # save timer
                        $proc->{id_timeout} = $timer;
                }
                 
                $self->{_data}->{$pid} = $proc;
                 
                close $stdout_p;
                close $stderr_p;
                close $stdres_p;
                 
                $self->watch('stdout', $pid);
                $self->watch('stderr', $pid);
                $self->watch('stdres', $pid);
        } else {
                # child
                 
                $self->is_child(1);
                 
                close $stdout_c;
                close $stderr_c;
                close $stdres_c;
                 
                # Stdio should not be tied.

lib/Mojo/Webqq/Run.pm  view on Meta::CPAN

                        eval { $proc->{exit_cb}->($pid, $cb_d); };
                         
                        $self->log->error("[process $pid]: Error running exit_cb: $@") if $@;
                } else {
                        $self->log->error("[process $pid]: No exit_cb callback!");
                }
        }
 
        delete $self->{_data}->{$pid};
        $self->num_forks($self->num_forks - 1);
}
 
sub _sig_chld {
        my ($self) = @_;
 
        no strict 'subs';
         
        my $i = 0;
        while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
                $i++;
                my $exit_val = $? >> 8;
                my $signum   = $? & 127;
                my $core     = $? & 128;
 
                # do process cleanup
                $self->cleanup($pid, $exit_val, $signum, $core);
        }
         
        $self->log->debug("SIGCHLD handler cleaned up after $i process(es).")
          if $i > 0;
}
 
sub _getRunStruct {
        my ($self, $opt) = @_;
         
        my $s = {
                pid          => 0,
                cmd          => undef,
                param        => undef,
                error        => undef,
                stdout_cb    => undef,
                stderr_cb    => undef,
                exit_cb      => undef,
                is_timeout   => undef,
                exec_timeout => 0,
                buf_stdout   => '',
                buf_stderr   => '',
                buf_stdres   => '',
                hdr_stdout   => undef,
                hdr_stderr   => undef,
                hdr_stdres   => undef,
        };
 
        # apply user defined vars...
        $s->{$_} = $opt->{$_}
                for grep { exists $s->{$_} } keys %$opt;
 
        return $s;
}
 
sub _validateRunStruct {
        my ($self, $s) = @_;
 
        # command?
        $self->error('Undefined command.') and return
                unless defined $s->{cmd};
         
        # check command...
        my $cmd_ref = ref $s->{cmd};
        $self->error('Zero-length command.') and return
                if $cmd_ref eq '' && length $s->{cmd} == 0;
         
        $self->error('Command can be pure scalar, arrayref or coderef.') and return
                if $cmd_ref ne '' && not defined first {$cmd_ref eq $_} ('CODE', 'ARRAY');
 
        # callbacks...
        $self->error("STDOUT callback defined, but is not code reference.") and return
                if defined $s->{stdout_cb} && ref $s->{stdout_cb} ne 'CODE';
         
        $self->error("STDERR callback defined, but is not code reference.") and return
                if defined $s->{stderr_cb} && ref $s->{stderr_cb} ne 'CODE';
         
        $self->error("Process exit_cb callback defined, but is not code reference.") and return
                if defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE';
 
        # exec timeout
        { no warnings; $s->{exec_timeout} += 0; }
 
        return 1;
}
 
sub _timeout_cb {
        my ($self, $pid) = @_;
         
        my $proc = $self->get_proc($pid);
        return 0 unless $proc;
         
        # drop timer (can't hurt...)
        if (defined $proc->{id_timeout}) {
                $self->ioloop->remove($proc->{id_timeout});
                $proc->{id_timeout} = undef;
        }
 
        # is process still alive?
        return 0 unless kill 0, $pid;
 
        $self->log->debug("[process $pid]: Execution timeout ("
                .sprintf("%-.3f seconds).", $proc->{exec_timeout})
                ." Killing process.");
 
        $proc->{stderr} .= ";Execution timeout.";
        $proc->{is_timeout} = 1;
         
        # kill the motherfucker!
 
        unless (CORE::kill(9, $pid)) {
                $self->log->warn("[process $pid]: Unable to kill process: $!");
        }
        $proc->{hard_kill} = 1;
        $self->cleanup($pid, 0, 9, 0);
 



( run in 2.246 seconds using v1.01-cache-2.11-cpan-2398b32b56e )