AnyEvent-Gearman-WorkerPool

 view release on metacpan or  search on metacpan

lib/AnyEvent/Gearman/WorkerPool.pm  view on Meta::CPAN

            job_servers => [@{$conf{job_servers}}],
        );
        $w = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($w);
        $w->register_function( "AnyEvent::Gearman::WorkerPool_".$self->boss_channel."::report" => sub{
            my $job = shift;
            my $workload = thaw($job->workload);
            if( $workload ){
                my $status = $workload->{status};
                my ($key,$idx) = split(/__/,$workload->{channel});
                DEBUG "SB $status $key $idx";
                if( $status eq 'busy'){
                    $self->slots($key)->[$idx]->is_busy(1);
                }
                elsif( $status eq 'idle'){
                    $self->slots($key)->[$idx]->is_busy(0);
                }
            }
            $job->complete;
        } );
        push(@reporters, $w);
    }

    $self->reporters(\@reporters);
}

lib/AnyEvent/Gearman/WorkerPool.pm  view on Meta::CPAN

		config=>
		{   
			global=>{ # common config
				job_servers=>['localhost'], # gearmand servers
				libs=>['./lib'], # perl5 library paths
				max=>3, # max workers
				},  
			slots=>{
				'TestWorker'=>{ # module package name which extends AnyEvent::Gearman::WorkerPool::Worker.
					min=>20, # min workers, count when started.
					max=>50, # overrides global config's max. Workers will extend when all workers are busy.
					workleft=>10, # workleft is life of worker. A worker will be respawned after used 10 times. 
								# if workleft is set as 0, a worker will be never respawned.
								# this feature is useful if worker code may has some memory leaks.
				},
				# you can place more worker modules here.
			}   
		}   
	);

	$pool->start();

lib/AnyEvent/Gearman/WorkerPool/Slot.pm  view on Meta::CPAN

use AnyEvent;
use AnyEvent::Gearman::WorkerPool::Worker;
use Data::Dumper;

has libs=>(is=>'rw',isa=>'ArrayRef',default=>sub{[]});
has job_servers=>(is=>'rw',isa=>'ArrayRef',required=>1);
has workleft=>(is=>'rw');
has worker_package=>(is=>'rw');
has worker_channel=>(is=>'rw');

has is_busy=>(is=>'rw',default=>0);
has is_stopped=>(is=>'rw',default=>1);
has boss_channel=>(is=>'rw',default=>'');

has worker_watcher=>(is=>'rw');
has worker_pid=>(is=>'rw');


sub BUILD{
    my $self = shift;
}

sub is_idle{
    my $self = shift;
    return ($self->is_running)&&(!$self->is_busy);
}
sub is_running{
    my $self = shift;
    return (!$self->is_stopped);
}

sub stop{
    DEBUG 'stop called';
    my $self = shift;
    $self->is_stopped(1);

lib/AnyEvent/Gearman/WorkerPool/Worker.pm  view on Meta::CPAN

has job_servers=>(is=>'rw', required=>1);
has boss_channel => (is=>'rw',required=>1, default=>'');
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);

# internal
has exported=>(is=>'ro',default=>sub{[]});
has worker=>(is=>'rw');

has is_stopped=>(is=>'rw');
has is_busy=>(is=>'rw');
has reporter=>(is=>'rw');

has cv=>(is=>'rw');

sub BUILD{
    my $self = shift;

    $self->cv->begin;

    my $js = $self->job_servers;

lib/AnyEvent/Gearman/WorkerPool/Worker.pm  view on Meta::CPAN

        my $fname = $m->fully_qualified_name;
        my $fcode = $m->body;

        $w->register_function($fname =>
            sub{
                my $job = shift;
                my $workload = $job->workload;

                DEBUG "[$fname] '$workload' workleft:".$self->workleft;
                $self->report('BUSY');
                $self->is_busy(1);

                my $res;
                eval{
                    $res = $fcode->($self,$job);
                };
                if ($@){
                    ERROR $@;
                    $w->fail;
                    return;
                }

                $self->report('IDLE');
                $self->is_busy(0);

                if( $self->workleft > 0 ){
                    $self->workleft($self->workleft-1);
                }

                if( $self->is_stopped ){
                    $self->stop_safe('stopped');
                }

                if( $self->workleft == 0 ){



( run in 0.330 second using v1.01-cache-2.11-cpan-87723dcf8b7 )