Mojo-Run

 view release on metacpan or  search on metacpan

lib/Mojo/Run.pm  view on Meta::CPAN

package Mojo::Run;

use Mojo::Base -base;

use bytes;
use Carp;
use Errno;
use Socket;
use Time::HiRes qw(time gettimeofday);
use Scalar::Util qw(blessed);
use Storable qw(thaw nfreeze);
use POSIX ":sys_wait_h";
use Mojo::Log;
use Mojo::IOLoop;
use Data::Dumper;
use Mojo::Reactor;
has 'num_forks'  => sub { 0 };
has 'max_forks'  => sub { 0 };
has 'log'        => sub { Mojo::Log   ->new };
has 'ioloop'     => sub { Mojo::IOLoop->new };
has [qw/reactor error is_child/];

our $VERSION = '0.3';

my $_obj  = undef;

BEGIN {
	*portable_pipe = sub () { my ($r, $w);
		pipe $r, $w or return;
		
		($r, $w);
	};
	*portable_socketpair = sub () {
		socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
			or return;
		$fh1->autoflush(1);
		$fh2->autoflush(1);
		
		($fh1, $fh2)
	};	
}

sub new { __PACKAGE__->singleton }

sub singleton {
	return $_obj if defined $_obj;
	return $_obj = __PACKAGE__->_constructor;
}

sub _constructor {
	my $proto = shift;
	my $class = ref($proto) || $proto;
	my $self  = $class->SUPER::new;

	bless $self => $class;
	
	# install SIGCHLD handler
	$SIG{'CHLD'} = sub { _sig_chld($self, @_) };
	
	return $self;
}

sub log_level {
	my ($self, $level) = @_;
	
	$self->log->level($level) if defined $level;
	
	return $self->log->level;
}

sub spawn {
	my ($self, %opt) = @_;
	
	unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
		my $obj = __PACKAGE__->new;
		return $obj->spawn(%opt);
	}
	
	$self->error('');
	
	if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
		$self->error("Unable to spawn another subprocess: "
			."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
		);
		return 0;
	}
	
	# normalize and validate run parameters...
	my $proc = $self->_getRunStruct(\%opt);
	return 0 unless $self->_validateRunStruct($proc);
	
	$self->log->debug("Spawning command "
		."timeout: "
		.($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
		." : [$proc->{cmd}]"
	);
	my ($stdout_p, $stdout_c) = portable_socketpair;
	my ($stderr_p, $stderr_c) = portable_socketpair;
	my ($stdres_p, $stdres_c) = portable_socketpair;
	
	$proc->{time_started} = time;
	$proc->{running     } = 1;
	$proc->{hdr_stdout  } = $stdout_c;
	$proc->{hdr_stderr  } = $stderr_c;
	$proc->{hdr_stdres  } = $stdres_c;
	
	my $pid = fork;
	
	if ($pid) {
		# parent
		$self->num_forks($self->num_forks + 1);
		
		$self->log->debug("Subprocess spawned as pid $pid.");
		
		$proc->{pid} = $pid;

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.466 second using v1.00-cache-2.02-grep-82fe00e-cpan-3b7f77b76a6c )