Gearman-SlotManager

 view release on metacpan or  search on metacpan

lib/Gearman/Slot.pm  view on Meta::CPAN


use Devel::GlobalDestruction;
# ABSTRACT: Slot class
our $VERSION = '0.3'; # VERSION
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($ERROR);

use Any::Moose;
use AnyEvent;
use Gearman::SlotWorker;
use Scalar::Util qw(weaken);
use Data::Dumper;

has libs=>(is=>'rw',isa=>'ArrayRef',default=>sub{[]});
has job_servers=>(is=>'rw',isa=>'ArrayRef',required=>1);
has workleft=>(is=>'rw');
has worker_package=>(is=>'rw');
has worker_channel=>(is=>'rw');

has is_busy=>(is=>'rw',default=>0);
has is_stopped=>(is=>'rw',default=>1);

lib/Gearman/Slot.pm  view on Meta::CPAN

                DEBUG '------------------ child restart ------------------------';
                $self->start();
            }
            else{
                DEBUG 'kill child OK';
                $self->worker_pid(undef);
                $self->worker_watcher(undef);
            }
        });
        $self->is_stopped(0);
        weaken($self);
    }
    else{
        my $class = $self->worker_package;
        my $worker_channel = $self->worker_channel;
        my $libs = join(' ',map{"-I$_"}@{$self->libs});;
        my $workleft = $self->workleft;
        my $sbbaseurl = $self->sbbaseurl;

        my $job_servers = '['.join(',',map{"\"$_\""}@{$self->job_servers}).']';

lib/Gearman/SlotManager.pm  view on Meta::CPAN

use namespace::autoclean;
use Devel::GlobalDestruction;
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($DEBUG);
Log::Log4perl->easy_init($ERROR);
use Data::Dumper;
use Any::Moose;

use AnyEvent;
use AnyEvent::HTTPD;
use Scalar::Util qw(weaken);

use Gearman::Slot;
has slotmap=>(is=>'rw', isa=>'HashRef', default=>sub{ return {}; });
has config=>(is=>'rw', isa=>'HashRef',required=>1);
has idle_watcher=>(is=>'rw');
has httpd=>(is=>'rw');
has port=>(is=>'rw',default=>sub{return 55995;});
sub BUILD{
    my $self = shift;

lib/Gearman/SlotManager.pm  view on Meta::CPAN

        '/idle'=>sub{
            my ($httpd,$req) = @_;
            DEBUG "SB idle ".$req->parm('channel');
            my ($key,$idx) = split(/__/,$req->parm('channel'));
            DEBUG "SB idle $key $idx";
            $self->slots($key)->[$idx]->is_busy(0);
            $req->respond({content=>['text/plain','ok']});
        },
    );
    $self->httpd($httpd);
    weaken($self);
}

sub slots{
    my $self = shift;
    my $key = shift;
    return $self->slotmap->{$key}->{slots};
}

sub conf{
    my $self = shift;

lib/Gearman/SlotManager.pm  view on Meta::CPAN

    foreach my $key (keys %{$self->slotmap}){
        my $slots = $self->slots($key);
        my $conf = $self->conf($key);
        my $min = $conf->{min};
        foreach my $i ( 0 .. $min-1 ){
            $slots->[$i]->start();
        }
    }
    my $iw = AE::timer 0,5, sub{$self->on_idle;};
    $self->idle_watcher($iw);
    #weaken($self);
}

sub on_idle{
    my $self = shift;
    DEBUG "ON_IDLE";
    foreach my $key (keys %{$self->slotmap}){
        my @slots = @{$self->slots($key)};
        my %conf = %{$self->conf($key)};
        my $idle = 0;
        my $running = 0;

lib/Gearman/SlotWorker.pm  view on Meta::CPAN

# ABSTRACT: A worker launched by Slot

our $VERSION = '0.3'; # VERSION
use Devel::GlobalDestruction;
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($DEBUG);
Log::Log4perl->easy_init($ERROR);

use Any::Moose;
use Gearman::Worker;
use Scalar::Util qw(weaken);
use LWP::Simple;

# options
has job_servers=>(is=>'rw',isa=>'ArrayRef', required=>1);
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);

# internal
has exported=>(is=>'ro',isa=>'ArrayRef[Class::MOP::Method]', default=>sub{[]});
has worker=>(is=>'rw');

lib/Gearman/SlotWorker.pm  view on Meta::CPAN

            {
                if( !$meta->has_attribute($methname) ){
                    DEBUG 'filtered: '.$method->fully_qualified_name;
                    push(@{$exported},$method);
                }
            }
        }
    }
    
    $self->register();
    weaken($self);
}

sub report{
    my $self = shift;
    my $msg = lc(shift);
    if($self->sbbaseurl){
        DEBUG "report $msg ".$self->channel;
        get($self->sbbaseurl.'/'.$msg.'?channel='.$self->channel);
    }
}

lib/Gearman/SlotWorker.pm  view on Meta::CPAN

                if( $self->workleft == 0 ){
                    $self->stop_safe('overworked');
                }


                return $res;
            }
        );
    }
    $self->worker($w);
    #weaken($w);
    weaken($self);
}

sub work{
    my $self = shift;
    $self->worker->work(stop_if=>sub{ $self->is_stopped } );
    DEBUG "stop completely";
}

sub stop_safe{
    my $self = shift;

t/02-Slot.t  view on Meta::CPAN

package main;

use lib 't/lib';
use Test::More tests=>3;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::Slot;
use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");
my $cv = AE::cv;

my $t = AE::timer 10,0,sub{ $cv->send('timeout')};

use_ok('Gearman::Server');
gstart($port);

my $slot = Gearman::Slot->new(

t/03-SlotOverwork.t  view on Meta::CPAN

package main;

use lib 't/lib';
use Test::More tests=>5;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::Slot;
use Scalar::Util qw(weaken);
use Log::Log4perl qw(:easy);
#Log::Log4perl->easy_init($ERROR);
Log::Log4perl->easy_init($ERROR);
my $port = '9955';
my @js = ("localhost:$port");
my $cv = AE::cv;

my $t = AE::timer 10,0,sub{ $cv->send('timeout')};

use_ok('Gearman::Server');

t/04-SlotManager.t  view on Meta::CPAN

package main;

use lib 't/lib';
use Test::More tests=>2;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::SlotManager;

use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");
my $cv = AE::cv;

my $t = AE::timer 10,0,sub{ $cv->send('timeout')};

use_ok('Gearman::Server');
gstart($port);

my $slotman = Gearman::SlotManager->new(

t/05-SlotManagerWork.t  view on Meta::CPAN


use lib 't/lib';
use Test::More tests=>12;
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::SlotManager;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($ERROR);

use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");

use_ok('Gearman::Server');
gstart($port);

my $cv = AE::cv;

my $sig = AE::signal 'INT'=> sub{ 
DEBUG "TERM!!";

testManager.pl  view on Meta::CPAN

package main;

use lib 't/lib','./lib';
use Gear;
use AnyEvent;
use AnyEvent::Gearman;
use Gearman::SlotManager;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($DEBUG);

use Scalar::Util qw(weaken);
my $port = '9955';
my @js = ("localhost:$port");

gstart($port);

my $cv = AE::cv;

my $sig = AE::signal 'INT'=> sub{ 
    DEBUG "TERM!!";
    $cv->send;



( run in 0.394 second using v1.01-cache-2.11-cpan-1f129e94a17 )