AnyEvent-Gearman-WorkerPool
view release on metacpan or search on metacpan
lib/AnyEvent/Gearman/WorkerPool.pm view on Meta::CPAN
package AnyEvent::Gearman::WorkerPool;
# ABSTRACT: Managing Worker's lifecycle with Slots
our $VERSION = '1.0'; # VERSION
use Log::Log4perl qw(:easy);
use Data::Dumper;
use Moose;
use Storable qw(freeze thaw);
use AnyEvent;
use AnyEvent::Gearman::Worker;
use AnyEvent::Gearman::Worker::RetryConnection;
use AnyEvent::Gearman::WorkerPool::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has boss_channel=>(is=>'rw', default=>sub{time});
has reporters=>(is=>'rw');
sub BUILD{
my $self = shift;
my $conf = $self->config;
my %global = %{$conf->{'global'}};
my %baseconf = (
job_servers=>[''],
min=>1,
max=>1,
workleft=>0,
);
%global = (%baseconf,%global);
my @reporters;
my %confs = %{$conf->{slots}};
foreach my $worker (keys %confs){
my %conf = %{$confs{$worker}};
%conf = (%global,%conf);
DEBUG Dumper(\%conf);
my @slots;
foreach (0 .. $conf{max}-1){
my $slot = AnyEvent::Gearman::WorkerPool::Slot->new(
job_servers=>$conf{job_servers},
libs=>$conf{libs},
workleft=>$conf{workleft},
boss_channel=>$self->boss_channel,
worker_package=>$worker,
worker_channel=>$worker.'__'.$_,
);
push( @slots, $slot);
}
$self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};
my $w = AnyEvent::Gearman::Worker->new(
job_servers => [@{$conf{job_servers}}],
);
$w = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($w);
$w->register_function( "AnyEvent::Gearman::WorkerPool_".$self->boss_channel."::report" => sub{
my $job = shift;
my $workload = thaw($job->workload);
if( $workload ){
my $status = $workload->{status};
my ($key,$idx) = split(/__/,$workload->{channel});
DEBUG "SB $status $key $idx";
if( $status eq 'busy'){
( run in 1.362 second using v1.01-cache-2.11-cpan-39bf76dae61 )