Gearman-SlotManager
view release on metacpan or search on metacpan
lib/Gearman/SlotManager.pm view on Meta::CPAN
package Gearman::SlotManager;
# ABSTRACT: Managing Worker's lifecycle with Slots
our $VERSION = '0.3'; # VERSION
use namespace::autoclean;
use Devel::GlobalDestruction;
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($DEBUG);
Log::Log4perl->easy_init($ERROR);
use Data::Dumper;
use Any::Moose;
use AnyEvent;
use AnyEvent::HTTPD;
use Scalar::Util qw(weaken);
use Gearman::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has httpd=>(is=>'rw');
has port=>(is=>'rw',default=>sub{return 55995;});
sub BUILD{
my $self = shift;
my $conf = $self->config;
my %global = %{$conf->{'global'}};
my %baseconf = (
job_servers=>[''],
min=>1,
max=>1,
workleft=>0,
);
%global = (%baseconf,%global);
my %confs = %{$conf->{slots}};
foreach my $worker (keys %confs){
my %conf = %{$confs{$worker}};
%conf = (%global,%conf);
DEBUG Dumper(\%conf);
my @slots;
foreach (0 .. $conf{max}-1){
my $slot = Gearman::Slot->new(
job_servers=>$conf{job_servers},
libs=>$conf{libs},
workleft=>$conf{workleft},
worker_package=>$worker,
worker_channel=>$worker.'__'.$_,
sbbaseurl=>'http://localhost:'.$self->port,
);
push( @slots, $slot);
}
$self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};
}
my $httpd = AnyEvent::HTTPD->new(port=>$self->port);
$httpd->reg_cb (
'/busy'=>sub{
my ($httpd,$req) = @_;
DEBUG "SB busy ".$req->parm('channel');
my ($key,$idx) = split(/__/,$req->parm('channel'));
DEBUG "SB busy $key $idx";
$self->slots($key)->[$idx]->is_busy(1);
$req->respond({content=>['text/plain','ok']});
},
'/idle'=>sub{
my ($httpd,$req) = @_;
DEBUG "SB idle ".$req->parm('channel');
my ($key,$idx) = split(/__/,$req->parm('channel'));
DEBUG "SB idle $key $idx";
$self->slots($key)->[$idx]->is_busy(0);
$req->respond({content=>['text/plain','ok']});
},
);
$self->httpd($httpd);
weaken($self);
}
sub slots{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{slots};
}
sub conf{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{conf};
}
sub start{
DEBUG __PACKAGE__." start";
my $self = shift;
foreach my $key (keys %{$self->slotmap}){
my $slots = $self->slots($key);
my $conf = $self->conf($key);
my $min = $conf->{min};
foreach my $i ( 0 .. $min-1 ){
$slots->[$i]->start();
}
}
my $iw = AE::timer 0,5, sub{$self->on_idle;};
$self->idle_watcher($iw);
#weaken($self);
}
sub on_idle{
my $self = shift;
DEBUG "ON_IDLE";
foreach my $key (keys %{$self->slotmap}){
my @slots = @{$self->slots($key)};
my %conf = %{$self->conf($key)};
my $idle = 0;
my $running = 0;
foreach my $s ( @slots ){
$idle += $s->is_idle;
$running += $s->is_running;
}
DEBUG "[$key] idle: $idle, running: $running";
if( !$idle ){
if( $running < $conf{max} ){
DEBUG "expand $key";
my @stopped = grep{$_->is_stopped}@slots;
shift(@stopped)->start;
}
}
else{
if( $running > $conf{min} ){
DEBUG "reduce $key";
my @running = grep{$_->is_running}@slots;
pop(@running)->stop;
( run in 0.919 second using v1.01-cache-2.11-cpan-5a3173703d6 )