Parallel-Manager

 view release on metacpan or  search on metacpan

lib/Parallel/Manager.pm  view on Meta::CPAN

package Parallel::Manager;

# ABSTRACT: fork threads to run some callback together
#------------------------------------------------------------------------------
# 加载系统模块
#------------------------------------------------------------------------------
use Moose;
use namespace::autoclean;
use POSIX qw/WNOHANG/;

#------------------------------------------------------------------------------
# 定义模块通用方法和属性
#------------------------------------------------------------------------------
has handler => (is => 'rw', isa => 'CodeRef', required => 1,);

has workers => (is => 'ro', isa => 'ArrayRef', required => 1,);

has thread => (is => 'rw', isa => 'Int', default => 5,);

# 等待子进程以及进入下一个子进程的周期
has poll_interval => (is => 'rw', isa => 'Num', default => 0.5,);

# 进入下一个队列的周期
has wait_interval => (is => 'rw', isa => 'Num', default => 0.5,);

#------------------------------------------------------------------------------
# 开启并发调度任务
#------------------------------------------------------------------------------
sub run {
  my $self = shift;

  # 根据负载量自动设定线程数
  @{$self->workers} > $self->thread ? $self->thread : $self->thread(scalar @{$self->workers});

  # 按照线程数塞进负载任务,拆分成 $self->thread 个子队列
  my $i = 0;
  my @threads;
  for my $job (@{$self->workers}) {
    push(@{$threads[$i % $self->thread]}, $job);
    $i++;
  }

  # 切入运行前钩子函数
  if ($self->{before_run_code}) {
    $self->{before_run_code}->($self->{before_run_vars});
  }

  # fork 线程并发调度任务
  my %childPids;
  for (my $j = 0; $j < $self->thread; $j++) {
    if (my $pid = fork) {
      $childPids{$pid} = undef;
    }
    else {
      for my $work (@{$threads[$j]}) {
        if ($self->{before_job_run_code}) {
          $self->{before_job_run_code}->($self->{before_job_run_vars});
        }
        eval { $self->handler->($work) } or die "Can't handle $work: $@\n";
        if ($self->{after_job_run_code}) {
          $self->{after_job_run_code}->($self->{after_job_run_vars});
        }
        sleep $self->poll_interval;
      }
      exit;
    }
    sleep $self->wait_interval;
  }

  # 异步非阻塞等待并发任务运行结束
  my $exitPid;
  while (keys(%childPids)) {
    while (($exitPid = waitpid(-1, WNOHANG)) > 0) {
      delete($childPids{$exitPid});
      sleep $self->wait_interval;
    }



( run in 0.833 second using v1.01-cache-2.11-cpan-39bf76dae61 )