Parallel-Manager
view release on metacpan or search on metacpan
lib/Parallel/Manager.pm view on Meta::CPAN
package Parallel::Manager;
# ABSTRACT: fork threads to run some callback together
#------------------------------------------------------------------------------
# å è½½ç³»ç»æ¨¡å
#------------------------------------------------------------------------------
use Moose;
use namespace::autoclean;
use POSIX qw/WNOHANG/;
#------------------------------------------------------------------------------
# å®ä¹æ¨¡åéç¨æ¹æ³å屿§
#------------------------------------------------------------------------------
has handler => (is => 'rw', isa => 'CodeRef', required => 1,);
has workers => (is => 'ro', isa => 'ArrayRef', required => 1,);
has thread => (is => 'rw', isa => 'Int', default => 5,);
# çå¾
åè¿ç¨ä»¥åè¿å
¥ä¸ä¸ä¸ªåè¿ç¨ç卿
has poll_interval => (is => 'rw', isa => 'Num', default => 0.5,);
# è¿å
¥ä¸ä¸ä¸ªéåç卿
has wait_interval => (is => 'rw', isa => 'Num', default => 0.5,);
#------------------------------------------------------------------------------
# å¼å¯å¹¶åè°åº¦ä»»å¡
#------------------------------------------------------------------------------
sub run {
my $self = shift;
# æ ¹æ®è´è½½éèªå¨è®¾å®çº¿ç¨æ°
@{$self->workers} > $self->thread ? $self->thread : $self->thread(scalar @{$self->workers});
# æç
§çº¿ç¨æ°å¡è¿è´è½½ä»»å¡ï¼æåæ $self->thread 个åéå
my $i = 0;
my @threads;
for my $job (@{$self->workers}) {
push(@{$threads[$i % $self->thread]}, $job);
$i++;
}
# åå
¥è¿è¡åé©å彿°
if ($self->{before_run_code}) {
$self->{before_run_code}->($self->{before_run_vars});
}
# fork 线ç¨å¹¶åè°åº¦ä»»å¡
my %childPids;
for (my $j = 0; $j < $self->thread; $j++) {
if (my $pid = fork) {
$childPids{$pid} = undef;
}
else {
for my $work (@{$threads[$j]}) {
if ($self->{before_job_run_code}) {
$self->{before_job_run_code}->($self->{before_job_run_vars});
}
eval { $self->handler->($work) } or die "Can't handle $work: $@\n";
if ($self->{after_job_run_code}) {
$self->{after_job_run_code}->($self->{after_job_run_vars});
}
sleep $self->poll_interval;
}
exit;
}
sleep $self->wait_interval;
}
# 弿¥éé»å¡çå¾
å¹¶åä»»å¡è¿è¡ç»æ
my $exitPid;
while (keys(%childPids)) {
while (($exitPid = waitpid(-1, WNOHANG)) > 0) {
delete($childPids{$exitPid});
sleep $self->wait_interval;
}
( run in 0.833 second using v1.01-cache-2.11-cpan-39bf76dae61 )