AnyEvent-Process

 view release on metacpan or  search on metacpan

lib/AnyEvent/Process.pm  view on Meta::CPAN

			open my $fh, $args->[1], $args->[2];
			unless (defined $fh) {
				croak "Opening file failed: $!";
			}
			push @fh_table, [$handle, $fh];
		} elsif ($args->[0] eq 'decorate') {
			my $out = $args->[3];
			unless (defined $out or ref $out eq 'GLOB') {
				croak "Third argument of decorate must be a glob reference";
			}

			my ($my_fh, $child_fh) = portable_pipe;
			unless (defined $my_fh && defined $child_fh) {
				croak "Creating pipe failed: $!";
			}

			my $on_read;
			my $decorator = $args->[2];
			if (defined $decorator and ref $decorator eq '') {
				$on_read = sub {
					while ($_[0]->rbuf() =~ s/^(.*\n)//) {
						print $out $decorator, $1;
					}
				};
			} elsif (defined $decorator and ref $decorator eq 'CODE') {
				$on_read = sub {
					while ($_[0]->rbuf() =~ s/^(.*\n)//) {
						print $out $decorator->($1);
					}
				};
			} else {
				croak "Second argument of decorate must be a string or code reference";
			}

			push @fh_table, [$handle, $child_fh];
			push @handles,  [$my_fh, [on_read => $on_read, on_eof => $callback_factory->()]];
		} else {
			croak "Unknown redirect type '$args->[0]'";
		}
	}

	# Start child
	my $pid = fork;
	my $job;
	unless (defined $pid) {
		croak "Fork failed: $!";
	} elsif ($pid == 0) {
		# Duplicate FDs
		foreach my $dup (@fh_table) {
			open $dup->[0], '+>&', $dup->[1];
			close $dup->[1];
		}

		# Close handles
		foreach my $dup (@handles) {
			close $dup->[0];
		}

		# Close other filedescriptors
		if (defined $proc_args{close_all_fds_except}) {
			my @not_close = map fileno($_), @{$proc_args{close_all_fds_except}};
			AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
			push @not_close, fileno $_->[0] foreach @fh_table;

			AE::log trace => "Closing all other fds except: " . join ', ', @not_close;
			AnyEvent::Util::close_all_fds_except @not_close;
		}

		# Run the code
		my $rtn = $proc_args{code}->(@{$proc_args{args} // []});
		exit ($rtn eq int($rtn) ? $rtn : 1);
	} else {
		AE::log info => "Forked new process $pid.";

		$job = new AnyEvent::Process::Job($pid);

		# Close FDs
		foreach my $dup (@fh_table) {
			AE::log trace => "Closing $dup->[1].";
			close $dup->[1];
		}

		# Create handles
		foreach my $handle (@handles) {
			my (@hdl_args, @hdl_calls);
			for (my $i = 0; $i < $#{$handle->[1]}; $i += 2) {
				if (AnyEvent::Handle->can($handle->[1][$i]) and 'ARRAY' eq ref $handle->[1][$i+1]) {
					if ($handle->[1][$i] eq 'on_eof') {
						push @hdl_calls, [$handle->[1][$i], $callback_factory->($handle->[1][$i+1][0])];
					} else {
						push @hdl_calls, [$handle->[1][$i], $handle->[1][$i+1]];
					}
				} else {
					push @hdl_args, $handle->[1][$i] => $handle->[1][$i+1];
				}
			}
			AE::log trace => "Creating handle " . join ' ', @hdl_args;
			my $hdl = AnyEvent::Handle->new(fh => $handle->[0], @hdl_args);
			foreach my $call (@hdl_calls) {
				no strict 'refs';
				my $method = $call->[0];
				AE::log trace => "Calling handle method $method(" . join(', ', @{$call->[1]}) . ')';
				$hdl->$method(@{$call->[1]});
			}
			$job->_add_handle($hdl);
		}

		# Create callbacks
		my $completion_cb = sub {
			$job->_remove_timers();
			AE::log info => "Process $job->{pid} finished with code $_[1].";
			$set_on_completion_args->($job, $_[1]);
		};
		$job->_add_cb(AE::child $pid, $callback_factory->($completion_cb));

		$self->{job} = $job;

		# Create watchdog and kill timers
		my $on_kill = $proc_args{on_kill} // sub { $_[0]->kill(9) };
		if (defined $proc_args{kill_interval}) {
			my $kill_cb = sub { 
				$job->_remove_timers();
				AE::log warn => "Process $job->{pid} is running too long, killing it.";
				$on_kill->($job);
			};
			$job->_add_timer(AE::timer $proc_args{kill_interval}, 0, $kill_cb);



( run in 0.595 second using v1.01-cache-2.11-cpan-39bf76dae61 )