AnyEvent-Gearman-WorkerPool
view release on metacpan or search on metacpan
lib/AnyEvent/Gearman/WorkerPool/Worker.pm view on Meta::CPAN
package AnyEvent::Gearman::WorkerPool::Worker;
# ABSTRACT: A worker launched by Slot
our $VERSION = '1.0'; # VERSION
use Log::Log4perl qw(:easy);
use AnyEvent::Gearman::Client;
use AnyEvent::Gearman::Worker;
use AnyEvent::Gearman::Worker::RetryConnection;
use Storable qw(freeze thaw);
use Moose;
# options
has job_servers=>(is=>'rw', required=>1);
has boss_channel => (is=>'rw',required=>1, default=>'');
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);
# internal
has exported=>(is=>'ro',default=>sub{[]});
has worker=>(is=>'rw');
has is_stopped=>(is=>'rw');
has is_busy=>(is=>'rw');
has reporter=>(is=>'rw');
has cv=>(is=>'rw');
sub BUILD{
my $self = shift;
$self->cv->begin;
my $js = $self->job_servers;
if( $self->boss_channel ){
my $client = AnyEvent::Gearman::Client->new(
job_servers => [@$js]
);
$self->reporter($client);
}
# register
my $meta = $self->meta;
my $package = $meta->{package};
my $exported = $self->exported;
if( $self->workleft == 0 ){
$self->workleft(-1);
}
for my $method ( $meta->get_all_methods )
{
my $packname = $method->package_name;
next if( $packname eq __PACKAGE__ ); # skip base class
my $methname = $method->name;
if( $packname eq $package )
{
if( $methname !~ /^_/ && $methname ne uc($methname) && $methname ne 'meta' )
{
if( !$meta->has_attribute($methname) ){
#DEBUG 'filtered: '.$method->fully_qualified_name;
push(@{$exported},$method);
}
}
}
}
$self->register($js);
}
sub report{
my $self = shift;
my $msg = lc(shift);
DEBUG "report $msg boss_channel". $self->boss_channel;
return unless $self->reporter;
$self->reporter->add_task_bg(
'AnyEvent::Gearman::WorkerPool_'.$self->boss_channel.'::report'=> freeze({status=>$msg, channel=>$self->channel})
);
}
sub unregister{
my $self = shift;
foreach my $m (@{$self->exported}){
my $fname = $m->fully_qualified_name;
$self->worker->unregister_function($fname) if $self->worker;
}
}
sub register{
my $self = shift;
my $js = shift;
my $w = AnyEvent::Gearman::Worker->new(
job_servers => [@$js],
);
$w = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($w);
foreach my $m (@{$self->exported}){
DEBUG "register ".$m->fully_qualified_name;
my $fname = $m->fully_qualified_name;
my $fcode = $m->body;
$w->register_function($fname =>
sub{
my $job = shift;
my $workload = $job->workload;
DEBUG "[$fname] '$workload' workleft:".$self->workleft;
$self->report('BUSY');
$self->is_busy(1);
my $res;
eval{
$res = $fcode->($self,$job);
};
if ($@){
ERROR $@;
$w->fail;
return;
}
$self->report('IDLE');
$self->is_busy(0);
if( $self->workleft > 0 ){
$self->workleft($self->workleft-1);
}
if( $self->is_stopped ){
$self->stop_safe('stopped');
}
if( $self->workleft == 0 ){
$self->stop_safe('overworked');
}
}
);
}
$self->worker($w);
}
sub stop_safe{
my $self = shift;
my $msg = shift;
$self->is_stopped(1);
$self->unregister;
$self->worker(undef);
DEBUG "stop_safe $msg";
$self->cv->end;
}
sub DEMOLISH{
my $self = shift;
$self->unregister() if $self->worker;
DEBUG __PACKAGE__." DEMOLISHED";
}
# class member
sub Loop{
my $class = shift;
die 'Use like PACKAGE->Loop(%opts).' unless $class;
die 'You need to use your own class extending '. __PACKAGE__ .'!' if $class eq __PACKAGE__;
my %opt = @_;
my $cv = AE::cv;
my $worker;
my $sig = AE::signal INT=>sub{
$worker->stop_safe('SIGINT');
$cv->send;
};
$cv->begin(sub{ $cv->send; });
eval{
$worker = $class->new(%opt,cv=>$cv);
};
$cv->end;
( run in 1.339 second using v1.01-cache-2.11-cpan-39bf76dae61 )