Gearman-SlotManager

 view release on metacpan or  search on metacpan

lib/Gearman/Slot.pm  view on Meta::CPAN

use Gearman::SlotWorker;
use Scalar::Util qw(weaken);
use Data::Dumper;

has libs=>(is=>'rw',isa=>'ArrayRef',default=>sub{[]});
has job_servers=>(is=>'rw',isa=>'ArrayRef',required=>1);
has workleft=>(is=>'rw');
has worker_package=>(is=>'rw');
has worker_channel=>(is=>'rw');

has is_busy=>(is=>'rw',default=>0);
has is_stopped=>(is=>'rw',default=>1);
has sbbaseurl=>(is=>'rw',default=>sub{''});

has worker_watcher=>(is=>'rw');
has worker_pid=>(is=>'rw');


sub BUILD{
    my $self = shift;
}

sub is_idle{
    my $self = shift;
    return ($self->is_running)&&(!$self->is_busy);
}
sub is_running{
    my $self = shift;
    return (!$self->is_stopped);
}

sub stop{
    DEBUG 'stop called';
    my $self = shift;
    $self->is_stopped(1);

lib/Gearman/SlotManager.pm  view on Meta::CPAN

                worker_channel=>$worker.'__'.$_,
                sbbaseurl=>'http://localhost:'.$self->port,
            );
            push( @slots, $slot);
        }
        $self->slotmap->{$worker} = {conf=>\%conf, slots=>\@slots};
    }

    my $httpd = AnyEvent::HTTPD->new(port=>$self->port);
    $httpd->reg_cb (
        '/busy'=>sub{
            my ($httpd,$req) = @_;
            DEBUG "SB busy ".$req->parm('channel');
            my ($key,$idx) = split(/__/,$req->parm('channel'));
            DEBUG "SB busy $key $idx";
            $self->slots($key)->[$idx]->is_busy(1);
            $req->respond({content=>['text/plain','ok']});
        },
        '/idle'=>sub{
            my ($httpd,$req) = @_;
            DEBUG "SB idle ".$req->parm('channel');
            my ($key,$idx) = split(/__/,$req->parm('channel'));
            DEBUG "SB idle $key $idx";
            $self->slots($key)->[$idx]->is_busy(0);
            $req->respond({content=>['text/plain','ok']});
        },
    );
    $self->httpd($httpd);
    weaken($self);
}

sub slots{
    my $self = shift;
    my $key = shift;

lib/Gearman/SlotWorker.pm  view on Meta::CPAN

# options
has job_servers=>(is=>'rw',isa=>'ArrayRef', required=>1);
has channel=>(is=>'rw',required=>1);
has workleft=>(is=>'rw',isa=>'Int', default=>-1);

# internal
has exported=>(is=>'ro',isa=>'ArrayRef[Class::MOP::Method]', default=>sub{[]});
has worker=>(is=>'rw');

has is_stopped=>(is=>'rw');
has is_busy=>(is=>'rw');

has sbbaseurl=>(is=>'rw',default=>sub{''});

sub BUILD{
    my $self = shift;
    # register
    my $meta = $self->meta();
    my $package = $meta->{package};
    my $exported = $self->exported();

lib/Gearman/SlotWorker.pm  view on Meta::CPAN

        DEBUG "register ".$m->fully_qualified_name;
        my $fname = $m->fully_qualified_name;
        my $fcode = $m->body;
        $w->register_function($fname =>
            sub{
                my $job = shift;
                my $workload = $job->arg;

                DEBUG "[$fname] '$workload' workleft:".$self->workleft;
                $self->report('BUSY');
                $self->is_busy(1);
                my $res;
                eval{
                    $res = $fcode->($self,$workload);
                };
                if ($@){
                    ERROR $@;
                    return;
                }

                $self->report('IDLE');
                $self->is_busy(0);

                if( $self->workleft > 0 ){
                    $self->workleft($self->workleft-1);
                }
                if( $self->is_stopped ){
                    $self->stop_safe('stopped');
                }
                if( $self->workleft == 0 ){
                    $self->stop_safe('overworked');
                }



( run in 0.408 second using v1.01-cache-2.11-cpan-87723dcf8b7 )