Gearman-Driver

 view release on metacpan or  search on metacpan

lib/Gearman/Driver/Observer.pm  view on Meta::CPAN

package Gearman::Driver::Observer;

use Moose;
use Net::Telnet::Gearman;
use POE;

=head1 NAME

Gearman::Driver::Observer - Observes Gearman status interface

=head1 DESCRIPTION

Each n seconds L<Net::Telnet::Gearman> is used to fetch status of
free/running/busy workers from the Gearman server. L<Gearman::Driver>
decides to fork more workers depending on the queue size and the
MinProcesses/MaxProcesses attribute of the job methods.

Currently there's no public interface.

=cut

has 'callback' => (
    is       => 'rw',
    isa      => 'CodeRef',
    required => 1,
);

has 'interval' => (
    is       => 'rw',
    isa      => 'Int',
    required => 1,
);

has 'server' => (
    is       => 'rw',
    isa      => 'Str',
    required => 1,
);

has 'telnet' => (
    auto_deref => 1,
    default    => sub { [] },
    is         => 'ro',
    isa        => 'ArrayRef[Net::Telnet::Gearman]',
);

has 'session' => (
    is  => 'ro',
    isa => 'POE::Session',
);

sub BUILD {
    my ($self) = @_;

    $self->_connect();

    $self->{session} = POE::Session->create(
        object_states => [
            $self => {
                _start       => '_start',
                fetch_status => '_fetch_status'
            }
        ]
    );
}

sub _start {
    $_[KERNEL]->delay( fetch_status => $_[OBJECT]->interval );
}

sub _connect {
    my ($self) = @_;

    $self->{telnet} = [];

    foreach my $server ( split /,/, $self->server ) {
        my ( $host, $port ) = split /:/, $server;

        my $telnet = Net::Telnet::Gearman->new(
            Host => $host || 'localhost',
            Port => $port || 4730,
        );

        push @{ $self->{telnet} }, $telnet;
    }
}

sub _fetch_status {
    my %data  = ();
    my @error = ();

    foreach my $telnet ( $_[OBJECT]->telnet ) {
        eval {
            my $status = $telnet->status;

            foreach my $row (@$status) {
                $data{ $row->name } ||= {
                    name    => $row->name,
                    busy    => 0,
                    free    => 0,
                    queue   => 0,
                    running => 0,
                };
                $data{ $row->name }{busy}    += $row->busy;
                $data{ $row->name }{free}    += $row->free;
                $data{ $row->name }{queue}   += $row->queue;
                $data{ $row->name }{running} += $row->running;
            }
        };

        # Try to re-open the telnet connection
        if ($@) {
            push @error, $@ if $@;
            eval { $telnet->open };
        }
    }

    $_[OBJECT]->callback->( { data => [ values %data ], error => \@error } );

    $_[KERNEL]->delay( fetch_status => $_[OBJECT]->interval );
}

no Moose;

__PACKAGE__->meta->make_immutable;

=head1 AUTHOR

See L<Gearman::Driver>.

=head1 COPYRIGHT AND LICENSE

See L<Gearman::Driver>.

=head1 SEE ALSO

=over 4

=item * L<Gearman::Driver>

=item * L<Gearman::Driver::Adaptor>

=item * L<Gearman::Driver::Console>

=item * L<Gearman::Driver::Console::Basic>

=item * L<Gearman::Driver::Console::Client>

=item * L<Gearman::Driver::Job>

=item * L<Gearman::Driver::Job::Method>

=item * L<Gearman::Driver::Loader>

=item * L<Gearman::Driver::Worker>

=back

=cut

1;



( run in 3.837 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )