Gearman-SlotManager
view release on metacpan or search on metacpan
lib/Gearman/Slot.pm view on Meta::CPAN
use Gearman::SlotWorker;
use Scalar::Util qw(weaken);
use Data::Dumper;
has libs=>(is=>'rw',isa=>'ArrayRef',default=>sub{[]});
has job_servers=>(is=>'rw',isa=>'ArrayRef',required=>1);
has workleft=>(is=>'rw');
has worker_package=>(is=>'rw');
has worker_channel=>(is=>'rw');
has is_busy=>(is=>'rw',default=>0);
has is_stopped=>(is=>'rw',default=>1);
has sbbaseurl=>(is=>'rw',default=>sub{''});
has worker_watcher=>(is=>'rw');
has worker_pid=>(is=>'rw');
sub BUILD{
my $self = shift;
}
sub is_idle{
my $self = shift;
return ($self->is_running)&&(!$self->is_busy);
}
sub is_running{
my $self = shift;
return (!$self->is_stopped);
}
sub stop{
DEBUG 'stop called';
my $self = shift;
$self->is_stopped(1);
lib/Gearman/SlotManager.pm view on Meta::CPAN
worker_channel=>$worker.'__'.$_,
sbbaseurl=>'http://localhost:'.$self->port,
);
push( @slots, $slot);
}
$self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};
}
my $httpd = AnyEvent::HTTPD->new(port=>$self->port);
$httpd->reg_cb (
'/busy'=>sub{
my ($httpd,$req) = @_;
DEBUG "SB busy ".$req->parm('channel');
my ($key,$idx) = split(/__/,$req->parm('channel'));
DEBUG "SB busy $key $idx";
$self->slots($key)->[$idx]->is_busy(1);
$req->respond({content=>['text/plain','ok']});
},
'/idle'=>sub{
my ($httpd,$req) = @_;
DEBUG "SB idle ".$req->parm('channel');
my ($key,$idx) = split(/__/,$req->parm('channel'));
DEBUG "SB idle $key $idx";
$self->slots($key)->[$idx]->is_busy(0);
$req->respond({content=>['text/plain','ok']});
},
);
$self->httpd($httpd);
weaken($self);
}
sub slots{
my $self = shift;
my $key = shift;
lib/Gearman/SlotWorker.pm view on Meta::CPAN
# options
has job_servers=>(is=>'rw',isa=>'ArrayRef', required=>1);
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);
# internal
has exported=>(is=>'ro',isa=>'ArrayRef[Class::MOP::Method]', default=>sub{[]});
has worker=>(is=>'rw');
has is_stopped=>(is=>'rw');
has is_busy=>(is=>'rw');
has sbbaseurl=>(is=>'rw',default=>sub{''});
sub BUILD{
my $self = shift;
# register
my $meta = $self->meta();
my $package = $meta->{package};
my $exported = $self->exported();
lib/Gearman/SlotWorker.pm view on Meta::CPAN
DEBUG "register ".$m->fully_qualified_name;
my $fname = $m->fully_qualified_name;
my $fcode = $m->body;
$w->register_function($fname =>
sub{
my $job = shift;
my $workload = $job->arg;
DEBUG "[$fname] '$workload' workleft:".$self->workleft;
$self->report('BUSY');
$self->is_busy(1);
my $res;
eval{
$res = $fcode->($self,$workload);
};
if ($@){
ERROR $@;
return;
}
$self->report('IDLE');
$self->is_busy(0);
if( $self->workleft > 0 ){
$self->workleft($self->workleft-1);
}
if( $self->is_stopped ){
$self->stop_safe('stopped');
}
if( $self->workleft == 0 ){
$self->stop_safe('overworked');
}
( run in 0.408 second using v1.01-cache-2.11-cpan-87723dcf8b7 )