AnyEvent-Gearman-WorkerPool
view release on metacpan or search on metacpan
lib/AnyEvent/Gearman/WorkerPool.pm view on Meta::CPAN
my $status = $workload->{status};
my ($key,$idx) = split(/__/,$workload->{channel});
DEBUG "SB $status $key $idx";
if( $status eq 'busy'){
$self->slots($key)->[$idx]->is_busy(1);
}
elsif( $status eq 'idle'){
$self->slots($key)->[$idx]->is_busy(0);
}
}
$job->complete;
} );
push(@reporters, $w);
}
$self->reporters(\@reporters);
}
sub slots{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{slots};
}
sub conf{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{conf};
}
sub start{
DEBUG __PACKAGE__." start";
my $self = shift;
foreach my $key (keys %{$self->slotmap}){
my $slots = $self->slots($key);
my $conf = $self->conf($key);
my $min = $conf->{min};
foreach my $i ( 0 .. $min-1 ){
$slots->[$i]->start();
}
}
my $iw = AE::timer 0,5, sub{$self->on_idle;};
$self->idle_watcher($iw);
}
sub on_idle{
my $self = shift;
DEBUG "ON_IDLE";
foreach my $key (keys %{$self->slotmap}){
my @slots = @{$self->slots($key)};
my %conf = %{$self->conf($key)};
my $idle = 0;
my $running = 0;
foreach my $s ( @slots ){
$idle += $s->is_idle;
$running += $s->is_running;
}
DEBUG "[$key] idle: $idle, running: $running";
if( !$idle ){
if( $running < $conf{max} ){
DEBUG "expand $key";
my @stopped = grep{$_->is_stopped}@slots;
shift(@stopped)->start;
}
}
else{
if( $running > $conf{min} ){
DEBUG "reduce $key";
my @running = grep{$_->is_running}@slots;
pop(@running)->stop;
}
}
}
}
sub stop{
DEBUG __PACKAGE__." stop";
my $self = shift;
$self->idle_watcher(undef);
foreach my $key (keys %{$self->slotmap}){
my $slots = $self->slots($key);
foreach my $s ( @{$slots} ){
$s->stop() unless $s->is_stopped;
}
}
}
sub DEMOLISH{
DEBUG __PACKAGE__.' DEMOLISHED';
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
AnyEvent::Gearman::WorkerPool - Managing Worker's lifecycle with Slots
=head1 VERSION
version 1.0
=head1 SYNOPSIS
worker_pool.pl
#!/usr/bin/env perl
use AnyEvent;
use AnyEvent::Gearman::WorkerPool;
my $cv = AE::cv;
my $sig = AE::signal 'INT'=> sub{
DEBUG "TERM!!";
( run in 2.484 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )