view release on metacpan or search on metacpan
lib/Gearman/Slot.pm view on Meta::CPAN
use Devel::GlobalDestruction;
# ABSTRACT: Slot class
our $VERSION = '0.3'; # VERSION
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($ERROR);
use Any::Moose;
use AnyEvent;
use Gearman::SlotWorker;
use Scalar::Util qw(weaken);
use Data::Dumper;
has libs=>(is=>'rw',isa=>'ArrayRef',default=>sub{[]});
has job_servers=>(is=>'rw',isa=>'ArrayRef',required=>1);
has workleft=>(is=>'rw');
has worker_package=>(is=>'rw');
has worker_channel=>(is=>'rw');
has is_busy=>(is=>'rw',default=>0);
has is_stopped=>(is=>'rw',default=>1);
lib/Gearman/Slot.pm view on Meta::CPAN
DEBUG '------------------ child restart ------------------------';
$self->start();
}
else{
DEBUG 'kill child OK';
$self->worker_pid(undef);
$self->worker_watcher(undef);
}
});
$self->is_stopped(0);
weaken($self);
}
else{
my $class = $self->worker_package;
my $worker_channel = $self->worker_channel;
my $libs = join(' ',map{"-I$_"}@{$self->libs});;
my $workleft = $self->workleft;
my $sbbaseurl = $self->sbbaseurl;
my $job_servers = '['.join(',',map{"\"$_\""}@{$self->job_servers}).']';
lib/Gearman/SlotManager.pm view on Meta::CPAN
use namespace::autoclean;
use Devel::GlobalDestruction;
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($DEBUG);
Log::Log4perl->easy_init($ERROR);
use Data::Dumper;
use Any::Moose;
use AnyEvent;
use AnyEvent::HTTPD;
use Scalar::Util qw(weaken);
use Gearman::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has httpd=>(is=>'rw');
has port=>(is=>'rw',default=>sub{return 55995;});
sub BUILD{
my $self = shift;
lib/Gearman/SlotManager.pm view on Meta::CPAN
'/idle'=>sub{
my ($httpd,$req) = @_;
DEBUG "SB idle ".$req->parm('channel');
my ($key,$idx) = split(/__/,$req->parm('channel'));
DEBUG "SB idle $key $idx";
$self->slots($key)->[$idx]->is_busy(0);
$req->respond({content=>['text/plain','ok']});
},
);
$self->httpd($httpd);
weaken($self);
}
sub slots{
my $self = shift;
my $key = shift;
return $self->slotmap->{$key}->{slots};
}
sub conf{
my $self = shift;
lib/Gearman/SlotManager.pm view on Meta::CPAN
foreach my $key (keys %{$self->slotmap}){
my $slots = $self->slots($key);
my $conf = $self->conf($key);
my $min = $conf->{min};
foreach my $i ( 0 .. $min-1 ){
$slots->[$i]->start();
}
}
my $iw = AE::timer 0,5, sub{$self->on_idle;};
$self->idle_watcher($iw);
#weaken($self);
}
sub on_idle{
my $self = shift;
DEBUG "ON_IDLE";
foreach my $key (keys %{$self->slotmap}){
my @slots = @{$self->slots($key)};
my %conf = %{$self->conf($key)};
my $idle = 0;
my $running = 0;
lib/Gearman/SlotWorker.pm view on Meta::CPAN
# ABSTRACT: A worker launched by Slot
our $VERSION = '0.3'; # VERSION
use Devel::GlobalDestruction;
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($DEBUG);
Log::Log4perl->easy_init($ERROR);
use Any::Moose;
use Gearman::Worker;
use Scalar::Util qw(weaken);
use LWP::Simple;
# options
has job_servers=>(is=>'rw',isa=>'ArrayRef', required=>1);
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);
# internal
has exported=>(is=>'ro',isa=>'ArrayRef[Class::MOP::Method]', default=>sub{[]});
has worker=>(is=>'rw');
lib/Gearman/SlotWorker.pm view on Meta::CPAN
{
if( !$meta->has_attribute($methname) ){
DEBUG 'filtered: '.$method->fully_qualified_name;
push(@{$exported},$method);
}
}
}
}
$self->register();
weaken($self);
}
sub report{
my $self = shift;
my $msg = lc(shift);
if($self->sbbaseurl){
DEBUG "report $msg ".$self->channel;
get($self->sbbaseurl.'/'.$msg.'?channel='.$self->channel);
}
}
lib/Gearman/SlotWorker.pm view on Meta::CPAN
if( $self->workleft == 0 ){
$self->stop_safe('overworked');
}
return $res;
}
);
}
$self->worker($w);
#weaken($w);
weaken($self);
}
sub work{
my $self = shift;
$self->worker->work(stop_if=>sub{ $self->is_stopped } );
DEBUG "stop completely";
}
sub stop_safe{
my $self = shift;
t/02-Slot.t view on Meta::CPAN
package main;
use lib 't/lib';
use Test::More tests=>3;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::Slot;
use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");
my $cv = AE::cv;
my $t = AE::timer 10,0,sub{ $cv->send('timeout')};
use_ok('Gearman::Server');
gstart($port);
my $slot = Gearman::Slot->new(
t/03-SlotOverwork.t view on Meta::CPAN
package main;
use lib 't/lib';
use Test::More tests=>5;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::Slot;
use Scalar::Util qw(weaken);
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($ERROR);
Log::Log4perl->easy_init($ERROR);
my $port = '9955';
my @js = ("localhost:$port");
my $cv = AE::cv;
my $t = AE::timer 10,0,sub{ $cv->send('timeout')};
use_ok('Gearman::Server');
t/04-SlotManager.t view on Meta::CPAN
package main;
use lib 't/lib';
use Test::More tests=>2;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::SlotManager;
use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");
my $cv = AE::cv;
my $t = AE::timer 10,0,sub{ $cv->send('timeout')};
use_ok('Gearman::Server');
gstart($port);
my $slotman = Gearman::SlotManager->new(
t/05-SlotManagerWork.t view on Meta::CPAN
use lib 't/lib';
use Test::More tests=>12;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::SlotManager;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($ERROR);
use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");
use_ok('Gearman::Server');
gstart($port);
my $cv = AE::cv;
my $sig = AE::signal 'INT'=> sub{
DEBUG "TERM!!";
testManager.pl view on Meta::CPAN
package main;
use lib 't/lib','./lib';
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::SlotManager;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($DEBUG);
use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");
gstart($port);
my $cv = AE::cv;
my $sig = AE::signal 'INT'=> sub{
DEBUG "TERM!!";
$cv->send;