AnyEvent-ProcessPool
view release on metacpan or search on metacpan
lib/AnyEvent/ProcessPool/Process.pm view on Meta::CPAN
pending => [],
}, $class;
}
sub DESTROY {
my $self = shift;
$self->{ps}->close if $self->{ps};
if (ref $self->{pending}) {
foreach my $cv (@{$self->{pending}}) {
if ($cv) {
$cv->croak('AnyEvent::ProcessPool::Process went out of scope with pending tasks');
}
}
}
}
sub pid {
my $self = shift;
return $self->{ps}->pid if $self->is_running;
}
sub is_running {
my $self = shift;
return defined $self->{started}
&& $self->{started}->ready;
}
sub await {
my $self = shift;
$self->start unless $self->is_running;
$self->{started}->recv;
}
sub stop {
my $self = shift;
if (defined $self->{process}) {
$self->{ps}->close;
undef $self->{started};
undef $self->{process};
undef $self->{ps};
}
}
sub start {
my $self = shift;
$self->{started} = AE::cv;
$self->{process} = AnyEvent::Open3::Simple->new(
on_start => sub{
$self->{started}->send;
},
on_stdout => sub{
my ($ps, $line) = @_;
my $task = AnyEvent::ProcessPool::Task->decode($line);
my $cv = shift @{$self->{pending}};
$cv->send($task);
if ($self->{limit} && $ps->user->{reqs} <= 0) {
$self->stop;
}
},
on_stderr => sub{
warn "AnyEvent::ProcessPool::Worker: $_[1]\n";
},
on_error => sub{
die "error launching worker process: $_[0]";
},
on_signal => sub{
warn "worker terminated in response to signal: $_[1]";
$self->stop;
},
on_fail => sub{
warn "worker terminated with non-zero exit status: $_[1]";
$self->stop;
},
);
$self->{process}->run("$perl $self->{include} $cmd", sub{
my $ps = shift;
$ps->user({reqs => $self->{limit}}) if $self->{limit};
$self->{ps} = $ps;
});
}
sub run {
my ($self, $task) = @_;
$self->await;
my $cv = AE::cv;
push @{$self->{pending}}, $cv;
$self->{ps}->say($task->encode);
--$self->{ps}->user->{reqs} if $self->{limit};
return $cv;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::ProcessPool::Process - Manages an individual worker process
=head1 VERSION
version 0.07
=head1 AUTHOR
Jeff Ober <sysread@fastmail.fm>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2017 by Jeff Ober.
This is free software; you can redistribute it and/or modify it under
( run in 1.670 second using v1.01-cache-2.11-cpan-5a3173703d6 )