AnyEvent-Gearman-WorkerPool
view release on metacpan or search on metacpan
lib/AnyEvent/Gearman/WorkerPool.pm view on Meta::CPAN
use Data::Dumper;
use Moose;
use Storable qw(freeze thaw);
use AnyEvent;
use AnyEvent::Gearman::Worker;
use AnyEvent::Gearman::Worker::RetryConnection;
use AnyEvent::Gearman::WorkerPool::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has boss_channel=>(is=>'rw', default=>sub{time});
has reporters=>(is=>'rw');
sub BUILD{
my $self = shift;
my $conf = $self->config;
my %global = %{$conf->{'global'}};
my %baseconf = (
job_servers=>[''],
min=>1,
max=>1,
workleft=>0,
);
%global = (%baseconf,%global);
my @reporters;
my %confs = %{$conf->{slots}};
foreach my $worker (keys %confs){
my %conf = %{$confs{$worker}};
%conf = (%global,%conf);
DEBUG Dumper(\%conf);
my @slots;
foreach (0 .. $conf{max}-1){
my $slot = AnyEvent::Gearman::WorkerPool::Slot->new(
job_servers=>$conf{job_servers},
libs=>$conf{libs},
workleft=>$conf{workleft},
boss_channel=>$self->boss_channel,
worker_package=>$worker,
worker_channel=>$worker.'__'.$_,
);
push( @slots, $slot);
}
$self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};
my $w = AnyEvent::Gearman::Worker->new(
job_servers => [@{$conf{job_servers}}],
);
$w = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($w);
$w->register_function( "AnyEvent::Gearman::WorkerPool_".$self->boss_channel."::report" => sub{
my $job = shift;
my $workload = thaw($job->workload);
if( $workload ){
my $status = $workload->{status};
my ($key,$idx) = split(/__/,$workload->{channel});
DEBUG "SB $status $key $idx";
if( $status eq 'busy'){
$self->slots($key)->[$idx]->is_busy(1);
}
elsif( $status eq 'idle'){
$self->slots($key)->[$idx]->is_busy(0);
}
}
$job->complete;
} );
push(@reporters, $w);
}
$self->reporters(\@reporters);
}
sub slots{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{slots};
}
sub conf{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{conf};
}
sub start{
DEBUG __PACKAGE__." start";
my $self = shift;
foreach my $key (keys %{$self->slotmap}){
my $slots = $self->slots($key);
my $conf = $self->conf($key);
my $min = $conf->{min};
foreach my $i ( 0 .. $min-1 ){
$slots->[$i]->start();
}
}
my $iw = AE::timer 0,5, sub{$self->on_idle;};
$self->idle_watcher($iw);
}
sub on_idle{
my $self = shift;
DEBUG "ON_IDLE";
foreach my $key (keys %{$self->slotmap}){
my @slots = @{$self->slots($key)};
my %conf = %{$self->conf($key)};
my $idle = 0;
my $running = 0;
foreach my $s ( @slots ){
$idle += $s->is_idle;
$running += $s->is_running;
}
DEBUG "[$key] idle: $idle, running: $running";
if( !$idle ){
if( $running < $conf{max} ){
DEBUG "expand $key";
my @stopped = grep{$_->is_stopped}@slots;
shift(@stopped)->start;
}
}
else{
if( $running > $conf{min} ){
DEBUG "reduce $key";
lib/AnyEvent/Gearman/WorkerPool.pm view on Meta::CPAN
sub stop{
DEBUG __PACKAGE__." stop";
my $self = shift;
$self->idle_watcher(undef);
foreach my $key (keys %{$self->slotmap}){
my $slots = $self->slots($key);
foreach my $s ( @{$slots} ){
$s->stop() unless $s->is_stopped;
}
}
}
sub DEMOLISH{
DEBUG __PACKAGE__.' DEMOLISHED';
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::Gearman::WorkerPool - Managing Worker's lifecycle with Slots
=head1 VERSION
version 1.0
=head1 SYNOPSIS
worker_pool.pl
#!/usr/bin/env perl
use AnyEvent;
use AnyEvent::Gearman::WorkerPool;
my $cv = AE::cv;
my $sig = AE::signal 'INT'=> sub{
DEBUG "TERM!!";
$cv->send;
};
my $pool = AnyEvent::Gearman::WorkerPool->new(
config=>
{
global=>{ # common config
job_servers=>['localhost'], # gearmand servers
libs=>['./lib'], # perl5 library paths
max=>3, # max workers
},
slots=>{
'TestWorker'=>{ # module package name which extends AnyEvent::Gearman::WorkerPool::Worker.
min=>20, # min workers, count when started.
max=>50, # overrides global config's max. Workers will extend when all workers are busy.
workleft=>10, # workleft is life of worker. A worker will be respawned after used 10 times.
# if workleft is set as 0, a worker will be never respawned.
# this feature is useful if worker code may has some memory leaks.
},
# you can place more worker modules here.
}
}
);
$pool->start();
my $res = $cv->recv;
undef($tt);
$pool->stop;
undef($pool);
lib/TestWorker.pm
package TestWorker;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($DEBUG);
use Moose;
extends 'AnyEvent::Gearman::WorkerPool::Worker';
sub slowreverse{
DEBUG 'slowreverse';
my $self = shift;
my $job = shift;
my t = AE::timer 1,0, sub{
my $res = reverse($job->workload);
$job->complete( $res );
};
}
sub reverse{
DEBUG 'reverse';
my $self = shift;
my $job = shift;
my $res = reverse($job->workload);
DEBUG $res;
$job->complete( $res );
}
sub _private{
my $self = shift;
my $job = shift;
DEBUG "_private:".$job->workload;
$job->complete();
}
1;
client.pl
#!/usr/bin/env perl
use AnyEvent;
use AnyEvent::Gearman;
my $cv = AE::cv;
my $c = gearman_client 'localhost';
$c->add_task(
( run in 2.964 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )