Mojo-Webqq

 view release on metacpan or  search on metacpan

lib/Mojo/Webqq/Run.pm  view on Meta::CPAN

package Mojo::Webqq::Run;
use List::Util qw(first);
use Mojo::Webqq::Base -base;
use bytes;
use Carp;
use Errno;
use Socket;
use Time::HiRes qw(time gettimeofday);
use Scalar::Util qw(blessed);
use Storable qw(thaw nfreeze);
use POSIX ":sys_wait_h";
use Mojo::Webqq::Log;
use Mojo::IOLoop;
use Mojo::Reactor;
has 'num_forks'  => sub { 0 };
has 'max_forks'  => sub { 0 };
has 'log'        => sub { Mojo::Webqq::Log->new };
has 'ioloop'     => sub { Mojo::IOLoop->singleton };
has [qw/reactor error is_child/];
 
our $VERSION = '0.3';
 
my $_obj  = undef;
 
BEGIN {
        *portable_pipe = sub () { my ($r, $w);
                pipe $r, $w or return;
                 
                ($r, $w);
        };
        *portable_socketpair = sub () {
                socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
                        or return;
                $fh1->autoflush(1);
                $fh2->autoflush(1);
                 
                ($fh1, $fh2)
        };      
}
 
sub new {my $class = shift; __PACKAGE__->singleton(@_) }
 
sub singleton {
        return $_obj if defined $_obj;
        my $class = shift;
        return $_obj = __PACKAGE__->_constructor(@_);
}
 
sub _constructor {
        my $proto = shift;
        my $class = ref($proto) || $proto;
        my $self  = $class->SUPER::new(@_);
 
        bless $self => $class;
         
        # install SIGCHLD handler
        $SIG{'CHLD'} = sub { _sig_chld($self, @_) };
         
        return $self;
}
 
sub log_level {
        my ($self, $level) = @_;
         
        $self->log->level($level) if defined $level;
         
        return $self->log->level;
}
 
sub spawn {
        my ($self, %opt) = @_;
         
        unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
                my $obj = __PACKAGE__->new;
                return $obj->spawn(%opt);
        }
         
        $self->error('');
         
        if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
                $self->error("Unable to spawn another subprocess: "
                        ."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
                );
                return 0;
        }
         
        # normalize and validate run parameters...
        my $proc = $self->_getRunStruct(\%opt);
        return 0 unless $self->_validateRunStruct($proc);
         
        $self->log->debug("Spawning command "
                ."timeout: "
                .($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
                ." : [$proc->{cmd}]"
        );
        my ($stdout_p, $stdout_c) = portable_socketpair;
        my ($stderr_p, $stderr_c) = portable_socketpair;
        my ($stdres_p, $stdres_c) = portable_socketpair;
         
        $proc->{time_started} = time;
        $proc->{running     } = 1;
        $proc->{hdr_stdout  } = $stdout_c;
        $proc->{hdr_stderr  } = $stderr_c;
        $proc->{hdr_stdres  } = $stdres_c;
         
        my $pid = fork;
         
        if ($pid) {
                # parent
                $self->num_forks($self->num_forks + 1);
                 
                $self->log->debug("Subprocess spawned as pid $pid.");
                 
                $proc->{pid} = $pid;

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.753 second using v1.00-cache-2.02-grep-82fe00e-cpan-1925d2aa809 )