AnyEvent-Gearman-WorkerPool
view release on metacpan or search on metacpan
lib/AnyEvent/Gearman/WorkerPool.pm view on Meta::CPAN
job_servers => [@{$conf{job_servers}}],
);
$w = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($w);
$w->register_function( "AnyEvent::Gearman::WorkerPool_".$self->boss_channel."::report" => sub{
my $job = shift;
my $workload = thaw($job->workload);
if( $workload ){
my $status = $workload->{status};
my ($key,$idx) = split(/__/,$workload->{channel});
DEBUG "SB $status $key $idx";
if( $status eq 'busy'){
$self->slots($key)->[$idx]->is_busy(1);
}
elsif( $status eq 'idle'){
$self->slots($key)->[$idx]->is_busy(0);
}
}
$job->complete;
} );
push(@reporters, $w);
}
$self->reporters(\@reporters);
}
lib/AnyEvent/Gearman/WorkerPool.pm view on Meta::CPAN
config=>
{
global=>{ # common config
job_servers=>['localhost'], # gearmand servers
libs=>['./lib'], # perl5 library paths
max=>3, # max workers
},
slots=>{
'TestWorker'=>{ # module package name which extends AnyEvent::Gearman::WorkerPool::Worker.
min=>20, # min workers, count when started.
max=>50, # overrides global config's max. Workers will extend when all workers are busy.
workleft=>10, # workleft is life of worker. A worker will be respawned after used 10 times.
# if workleft is set as 0, a worker will be never respawned.
# this feature is useful if worker code may has some memory leaks.
},
# you can place more worker modules here.
}
}
);
$pool->start();
lib/AnyEvent/Gearman/WorkerPool/Slot.pm view on Meta::CPAN
use AnyEvent;
use AnyEvent::Gearman::WorkerPool::Worker;
use Data::Dumper;
has libs=>(is=>'rw',isa=>'ArrayRef',default=>sub{[]});
has job_servers=>(is=>'rw',isa=>'ArrayRef',required=>1);
has workleft=>(is=>'rw');
has worker_package=>(is=>'rw');
has worker_channel=>(is=>'rw');
has is_busy=>(is=>'rw',default=>0);
has is_stopped=>(is=>'rw',default=>1);
has boss_channel=>(is=>'rw',default=>'');
has worker_watcher=>(is=>'rw');
has worker_pid=>(is=>'rw');
sub BUILD{
my $self = shift;
}
sub is_idle{
my $self = shift;
return ($self->is_running)&&(!$self->is_busy);
}
sub is_running{
my $self = shift;
return (!$self->is_stopped);
}
sub stop{
DEBUG 'stop called';
my $self = shift;
$self->is_stopped(1);
lib/AnyEvent/Gearman/WorkerPool/Worker.pm view on Meta::CPAN
has job_servers=>(is=>'rw', required=>1);
has boss_channel => (is=>'rw',required=>1, default=>'');
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);
# internal
has exported=>(is=>'ro',default=>sub{[]});
has worker=>(is=>'rw');
has is_stopped=>(is=>'rw');
has is_busy=>(is=>'rw');
has reporter=>(is=>'rw');
has cv=>(is=>'rw');
sub BUILD{
my $self = shift;
$self->cv->begin;
my $js = $self->job_servers;
lib/AnyEvent/Gearman/WorkerPool/Worker.pm view on Meta::CPAN
my $fname = $m->fully_qualified_name;
my $fcode = $m->body;
$w->register_function($fname =>
sub{
my $job = shift;
my $workload = $job->workload;
DEBUG "[$fname] '$workload' workleft:".$self->workleft;
$self->report('BUSY');
$self->is_busy(1);
my $res;
eval{
$res = $fcode->($self,$job);
};
if ($@){
ERROR $@;
$w->fail;
return;
}
$self->report('IDLE');
$self->is_busy(0);
if( $self->workleft > 0 ){
$self->workleft($self->workleft-1);
}
if( $self->is_stopped ){
$self->stop_safe('stopped');
}
if( $self->workleft == 0 ){
( run in 0.330 second using v1.01-cache-2.11-cpan-87723dcf8b7 )