AnyEvent-Gearman-WorkerPool

 view release on metacpan or  search on metacpan

lib/AnyEvent/Gearman/WorkerPool.pm  view on Meta::CPAN

package AnyEvent::Gearman::WorkerPool;
# ABSTRACT: Managing Worker's lifecycle with Slots
our $VERSION = '1.0'; # VERSION
use Log::Log4perl qw(:easy);

use Data::Dumper;
use Moose;
use Storable qw(freeze thaw);

use AnyEvent;
use AnyEvent::Gearman::Worker;
use AnyEvent::Gearman::Worker::RetryConnection;

use AnyEvent::Gearman::WorkerPool::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has boss_channel=>(is=>'rw', default=>sub{time});
has reporters=>(is=>'rw');
sub BUILD{
    my $self = shift;

    my $conf = $self->config;
    my %global = %{$conf->{'global'}};
    my %baseconf = (
        job_servers=>[''],
        min=>1,
        max=>1,
        workleft=>0,
    );
    %global = (%baseconf,%global);
    
    my @reporters;
    my %confs = %{$conf->{slots}};
    foreach my $worker (keys %confs){
        my %conf = %{$confs{$worker}};

        %conf = (%global,%conf);
        DEBUG Dumper(\%conf);

        my @slots;
        foreach (0 .. $conf{max}-1){
            my $slot = AnyEvent::Gearman::WorkerPool::Slot->new(
                job_servers=>$conf{job_servers},
                libs=>$conf{libs},
                workleft=>$conf{workleft},
                boss_channel=>$self->boss_channel,
                worker_package=>$worker,
                worker_channel=>$worker.'__'.$_,
            );
            push( @slots, $slot);
        }
        $self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};

        my $w = AnyEvent::Gearman::Worker->new(
            job_servers => [@{$conf{job_servers}}],
        );
        $w = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($w);
        $w->register_function( "AnyEvent::Gearman::WorkerPool_".$self->boss_channel."::report" => sub{
            my $job = shift;
            my $workload = thaw($job->workload);
            if( $workload ){
                my $status = $workload->{status};
                my ($key,$idx) = split(/__/,$workload->{channel});
                DEBUG "SB $status $key $idx";
                if( $status eq 'busy'){



( run in 1.362 second using v1.01-cache-2.11-cpan-39bf76dae61 )