AnyEvent-Process

 view release on metacpan or  search on metacpan

lib/AnyEvent/Process.pm  view on Meta::CPAN

package AnyEvent::Process::Job;
use strict;

sub new {
	my ($ref, $pid) = @_;
	my $self = bless {pid => $pid, cbs => [], handles => [], timers => []}, $ref;

	return $self;
}

sub kill {
	my ($self, $signal) = @_;

	return kill $signal, $self->{pid};
}

sub pid {
	return $_[0]->{pid};
}

sub _add_cb {
	my ($self, $cb) = @_;
	push @{$self->{cbs}}, $cb;
}

sub _add_handle {
	my ($self, $handle) = @_;
	push @{$self->{handles}}, $handle;
}

sub _add_timer {
	my ($self, $timer) = @_;
	push @{$self->{timers}}, $timer;
}

sub _remove_cbs {
	undef $_[0]->{cbs};
}

sub _remove_timers {
	my $self = shift;
	undef $_ foreach @{$self->{timers}};
	undef $self->{timers};
}

sub close {
	my $self = shift;
	undef $_ foreach @{$self->{handles}};
	undef $self->{handles};
}

package AnyEvent::Process;
use strict;

use AnyEvent::Handle;
use AnyEvent::Util;
use AnyEvent;
use Carp;

our @proc_args = qw(fh_table code on_completion args watchdog_interval on_watchdog kill_interval on_kill close_all_fds_except);
our $VERSION = '0.02';

my $nop = sub {};

sub _yield {
	my $cv_yield = AE::cv;
	AE::postpone { $cv_yield->send };
	$cv_yield->recv;
}

# Create a callback factory. This is needed to execute on_completion after all
# other callbacks. 
sub _create_callback_factory {
	my $on_completion = shift // $nop;
	my $counter = 0;
	my @on_completion_args;

	my $factory = sub {
		my $func = shift // $nop;

		$counter++;
		return sub {
			my ($err, $rtn);

			eval {
				$rtn = $func->(@_);
			}; $err = $@;

			if (--$counter == 0) {
				eval {
					$on_completion->(@on_completion_args);
				}; $err = $err || $@;
				$on_completion_args[0]->_remove_cbs;
			}

			if ($err) {
				croak $err;
			}

			return $rtn;
		}
	};
	my $set_on_completion_args = sub {

lib/AnyEvent/Process.pm  view on Meta::CPAN

			open $dup->[0], '+>&', $dup->[1];
			close $dup->[1];
		}

		# Close handles
		foreach my $dup (@handles) {
			close $dup->[0];
		}

		# Close other filedescriptors
		if (defined $proc_args{close_all_fds_except}) {
			my @not_close = map fileno($_), @{$proc_args{close_all_fds_except}};
			AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
			push @not_close, fileno $_->[0] foreach @fh_table;

			AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
			AnyEvent::Util::close_all_fds_except @not_close;
		}

		# Run the code
		my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
		exit ($rtn eq int($rtn) ? $rtn : 1);
	} else {
		AE::log info => "Forked new process $pid.";

		$job = new AnyEvent::Process::Job($pid);

		# Close FDs
		foreach my $dup (@fh_table) {
			AE::log trace => "Closing $dup->[1].";
			close $dup->[1];
		}

		# Create handles
		foreach my $handle (@handles) {
			my (@hdl_args, @hdl_calls);
			for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
				if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
					if ($handle->[1][$i] eq 'on_eof') {
						push @hdl_calls, [$handle->[1][$i], $callback_factory->($handle->[1][$i+1][0])];
					} else {
						push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
					}
				} else {
					push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
				}
			}
			AE::log trace => "Creating handle " . join ' ', @hdl_args;
			my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
			foreach my $call (@hdl_calls) {
				no strict 'refs';
				my $method = $call->[0];
				AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
				$hdl->$method(@{$call->[1]});
			}
			$job->_add_handle($hdl);
		}

		# Create callbacks
		my $completion_cb = sub {
			$job->_remove_timers();
			AE::log info => "Process $job->{pid} finished with code $_[1].";
			$set_on_completion_args->($job, $_[1]);
		};
		$job->_add_cb(AE::child $pid, $callback_factory->($completion_cb));

		$self->{job} = $job;

		# Create watchdog and kill timers
		my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
		if (defined $proc_args{kill_interval}) {
			my $kill_cb = sub { 
				$job->_remove_timers();
				AE::log warn => "Process $job->{pid} is running too long, killing it.";
				$on_kill->($job);
			};
			$job->_add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);
		}
		if (defined $proc_args{watchdog_interval} or defined $proc_args{on_watchdog}) {
			unless (defined $proc_args{watchdog_interval} &&
				defined $proc_args{on_watchdog}) {
				croak "Both or none of watchdog_interval and on_watchdog must be defined";
			}

			my $watchdog_cb = sub {
				AE::log info => "Executing watchdog for process $job->{pid}.";
				unless ($proc_args{on_watchdog}->($job)) {
					$job->_remove_timers();
					AE::log warn => "Watchdog for process $job->{pid} failed, killing it.";
					$on_kill->($job);
				}
			};
			$job->_add_timer(AE::timer $proc_args{watchdog_interval}, $proc_args{watchdog_interval}, $watchdog_cb);
		}
	}
	
	# We need this to allow AE collecting pending signals and prevent accumulation of zombies
	$self->_yield;

	return $job;
}

sub kill {
	my ($self, $signal) = @_;

	croak 'No process was started' unless defined $self->{job};
	return $self->{job}->kill($signal // 15);
}

sub pid {
	my $self = shift;

	croak 'No process was started' unless defined $self->{job};
	return $self->{job}->pid();
}

sub close {
	my $self = shift;

	croak 'No process was started' unless defined $self->{job};
	return $self->{job}->close();
}

1;

__END__

=head1 NAME

AnyEvent::Process - Start a process and watch for events 

=head1 SYNOPSIS

  use AnyEvent::Process;

  my $proc = new AnyEvent::Process(
    fh_table => [
      # Connect OUTPIPE file handle to STDIN of a new process
      \*STDIN  => ['pipe', '<', \*OUTPIPE],
      # Connect INPIPE file handle to STDOUT of a new process
      \*STDOUT => ['pipe', '>', \*INPIPE],
      # Print everything written to STDERR by a new process to STDERR of current
      # process, but prefix every line with 'bc ERROR: '
      \*STDERR => ['decorate', '>', 'bc ERROR: ', \*STDERR]
    ],
    # We don't want to wait longer than 10 seconds, so kill bc after that time
    kill_interval => 10,
    # Execute bc in a new process
    code => sub {
      exec 'bc', '-q';
    });
  
  # Start bc in a new process



( run in 0.975 second using v1.01-cache-2.11-cpan-02777c243ea )