AnyEvent-Gearman-WorkerPool

 view release on metacpan or  search on metacpan

lib/AnyEvent/Gearman/WorkerPool.pm  view on Meta::CPAN

                my $status = $workload->{status};
                my ($key,$idx) = split(/__/,$workload->{channel});
                DEBUG "SB $status $key $idx";
                if( $status eq 'busy'){
                    $self->slots($key)->[$idx]->is_busy(1);
                }
                elsif( $status eq 'idle'){
                    $self->slots($key)->[$idx]->is_busy(0);
                }
            }
            $job->complete;
        } );
        push(@reporters, $w);
    }

    $self->reporters(\@reporters);
}

sub slots{
    my $self = shift;
    my $key = shift;
    return $self->slotmap->{$key}->{slots};
}

sub conf{
    my $self = shift;
    my $key = shift;
    return $self->slotmap->{$key}->{conf};
}

sub start{
    DEBUG __PACKAGE__." start";
    my $self = shift;
    foreach my $key (keys %{$self->slotmap}){
        my $slots = $self->slots($key);
        my $conf = $self->conf($key);
        my $min = $conf->{min};
        foreach my $i ( 0 .. $min-1 ){
            $slots->[$i]->start();
        }
    }
    my $iw = AE::timer 0,5, sub{$self->on_idle;};
    $self->idle_watcher($iw);
}

sub on_idle{
    my $self = shift;
    DEBUG "ON_IDLE";
    foreach my $key (keys %{$self->slotmap}){
        my @slots = @{$self->slots($key)};
        my %conf = %{$self->conf($key)};
        my $idle = 0;
        my $running = 0;
        foreach my $s ( @slots ){
            $idle += $s->is_idle;
            $running += $s->is_running;
        }
        DEBUG "[$key] idle: $idle, running: $running";
        if( !$idle ){
            if( $running < $conf{max} ){
                DEBUG "expand $key";
                my @stopped = grep{$_->is_stopped}@slots;
                shift(@stopped)->start;
            }
        }
        else{
            if( $running > $conf{min} ){
                DEBUG "reduce $key";
                my @running = grep{$_->is_running}@slots;
                pop(@running)->stop;
            }
        }
    }

}

sub stop{
    DEBUG __PACKAGE__." stop";
    my $self = shift;
    $self->idle_watcher(undef);
    foreach my $key (keys %{$self->slotmap}){
        my $slots = $self->slots($key);
        foreach my $s ( @{$slots} ){
            $s->stop() unless $s->is_stopped;
        }
    }
}

sub DEMOLISH{
    DEBUG __PACKAGE__.' DEMOLISHED';
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::Gearman::WorkerPool - Managing Worker's lifecycle with Slots

=head1 VERSION

version 1.0

=head1 SYNOPSIS

worker_pool.pl

	#!/usr/bin/env perl

	use AnyEvent;
	use AnyEvent::Gearman::WorkerPool;
	
	my $cv = AE::cv;

	my $sig = AE::signal 'INT'=> sub{ 
		DEBUG "TERM!!";



( run in 2.484 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )