Mojo-Run

 view release on metacpan or  search on metacpan

lib/Mojo/Run.pm  view on Meta::CPAN

	*portable_pipe = sub () { my ($r, $w);
		pipe $r, $w or return;
		
		($r, $w);
	};
	*portable_socketpair = sub () {
		socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
			or return;
		$fh1->autoflush(1);
		$fh2->autoflush(1);
		
		($fh1, $fh2)
	};	
}

sub new { __PACKAGE__->singleton }

sub singleton {
	return $_obj if defined $_obj;
	return $_obj = __PACKAGE__->_constructor;
}

sub _constructor {
	my $proto = shift;
	my $class = ref($proto) || $proto;
	my $self  = $class->SUPER::new;

	bless $self => $class;
	
	# install SIGCHLD handler
	$SIG{'CHLD'} = sub { _sig_chld($self, @_) };
	
	return $self;
}

sub log_level {
	my ($self, $level) = @_;
	
	$self->log->level($level) if defined $level;
	
	return $self->log->level;
}

sub spawn {
	my ($self, %opt) = @_;
	
	unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
		my $obj = __PACKAGE__->new;
		return $obj->spawn(%opt);
	}
	
	$self->error('');
	
	if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
		$self->error("Unable to spawn another subprocess: "
			."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
		);
		return 0;
	}
	
	# normalize and validate run parameters...
	my $proc = $self->_getRunStruct(\%opt);
	return 0 unless $self->_validateRunStruct($proc);
	
	$self->log->debug("Spawning command "
		."timeout: "
		.($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
		." : [$proc->{cmd}]"
	);
	my ($stdout_p, $stdout_c) = portable_socketpair;
	my ($stderr_p, $stderr_c) = portable_socketpair;
	my ($stdres_p, $stdres_c) = portable_socketpair;
	
	$proc->{time_started} = time;
	$proc->{running     } = 1;
	$proc->{hdr_stdout  } = $stdout_c;
	$proc->{hdr_stderr  } = $stderr_c;
	$proc->{hdr_stdres  } = $stdres_c;
	
	my $pid = fork;
	
	if ($pid) {
		# parent
		$self->num_forks($self->num_forks + 1);
		
		$self->log->debug("Subprocess spawned as pid $pid.");
		
		$proc->{pid} = $pid;
		
		# exec timeout
		if (defined $proc->{exec_timeout} && $proc->{exec_timeout} > 0) {
			$self->log->debug(
				"[process $pid]: Setting execution timeout to " .
				sprintf("%-.3f seconds.", $proc->{exec_timeout})
			);
			my $timer = $self->ioloop->timer(
				$proc->{exec_timeout},
				sub { _timeout_cb($self, $pid) }
			);
	
			# save timer
			$proc->{id_timeout} = $timer;
		}
		
		$self->{_data}->{$pid} = $proc;
		
		close $stdout_p;
		close $stderr_p;
		close $stdres_p;
		
		$self->watch('stdout', $pid);
		$self->watch('stderr', $pid);
		$self->watch('stdres', $pid);
	} else {
		# child
		
		$self->is_child(1);
		
		close $stdout_c;
		close $stderr_c;
		close $stdres_c;
		
		# Stdio should not be tied.

lib/Mojo/Run.pm  view on Meta::CPAN

			$self->log->debug("[process $pid]: invoking exit_cb callback.");
			eval { $proc->{exit_cb}->($pid, $cb_d); };
			
			$self->log->error("[process $pid]: Error running exit_cb: $@") if $@;
		} else {
			$self->log->error("[process $pid]: No exit_cb callback!");
		}
	}

	delete $self->{_data}->{$pid};
	$self->num_forks($self->num_forks - 1);
}

sub _sig_chld {
	my ($self) = @_;

	no strict 'subs';
	
	my $i = 0;
	while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
		$i++;
		my $exit_val = $? >> 8;
		my $signum   = $? & 127;
		my $core     = $? & 128;

		# do process cleanup
		$self->cleanup($pid, $exit_val, $signum, $core);
	}
	
	$self->log->debug("SIGCHLD handler cleaned up after $i process(es).")
	  if $i > 0;
}

sub _getRunStruct {
	my ($self, $opt) = @_;
	
	my $s = {
		pid          => 0,
		cmd          => undef,
		param        => undef,
		error        => undef,
		stdout_cb    => undef,
		stderr_cb    => undef,
		exit_cb      => undef,
		exec_timeout => 0,
		buf_stdout   => '',
		buf_stderr   => '',
		buf_stdres   => '',
		hdr_stdout   => undef,
		hdr_stderr   => undef,
		hdr_stdres   => undef,
	};

	# apply user defined vars...
	$s->{$_} = $opt->{$_}
		for grep { exists $s->{$_} } keys %$opt;

	return $s;
}

sub _validateRunStruct {
	my ($self, $s) = @_;

	# command?
	$self->error('Undefined command.') and return
		unless defined $s->{cmd};
	
	# check command...
	my $cmd_ref = ref $s->{cmd};
	$self->error('Zero-length command.') and return
		if $cmd_ref eq '' && length $s->{cmd} == 0;
	
	$self->error('Command can be pure scalar, arrayref or coderef.') and return
		if $cmd_ref ne '' && not $cmd_ref ~~ ['CODE', 'ARRAY'];

	# callbacks...
	$self->error("STDOUT callback defined, but is not code reference.") and return
		if defined $s->{stdout_cb} && ref $s->{stdout_cb} ne 'CODE';
	
	$self->error("STDERR callback defined, but is not code reference.") and return
		if defined $s->{stderr_cb} && ref $s->{stderr_cb} ne 'CODE';
	
	$self->error("Process exit_cb callback defined, but is not code reference.") and return
		if defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE';

	# exec timeout
	{ no warnings; $s->{exec_timeout} += 0; }

	return 1;
}

sub _timeout_cb {
	my ($self, $pid) = @_;
	
	my $proc = $self->get_proc($pid);
	return 0 unless $proc;
	
	# drop timer (can't hurt...)
	if (defined $proc->{id_timeout}) {
		$self->ioloop->remove($proc->{id_timeout});
		$proc->{id_timeout} = undef;
	}

	# is process still alive?
	return 0 unless kill 0, $pid;

	$self->log->debug("[process $pid]: Execution timeout ("
		.sprintf("%-.3f seconds).", $proc->{exec_timeout})
		." Killing process.");

	$proc->{stderr} .= ";Execution timeout.";
	
	# kill the motherfucker!

	unless (CORE::kill(9, $pid)) {
		$self->log->warn("[process $pid]: Unable to kill process: $!");
	}
	$proc->{hard_kill} = 1;
	$self->cleanup($pid, 0, 9, 0);

	return 1;



( run in 0.944 second using v1.01-cache-2.11-cpan-df04353d9ac )