Gearman-Driver

 view release on metacpan or  search on metacpan

lib/Gearman/Driver.pm  view on Meta::CPAN

See also: L<prefix|Gearman::Driver::Worker/prefix>

=head1 ATTRIBUTES

See also L<Gearman::Driver::Loader/ATTRIBUTES>.

=head2 server

A list of Gearman servers the workers should connect to. The format
for the server list is: C<host[:port][,host[:port]]>

See also: L<Gearman::XS>

=over 4

=item * default: C<localhost:4730>

=item * isa: C<Str>

=back

=cut

has 'server' => (
    default       => 'localhost:4730',
    documentation => 'Gearman host[:port][,host[:port]]',
    is            => 'rw',
    isa           => 'Str',
    required      => 1,
);

=head2 console_port

Gearman::Driver has a telnet management console, see also:

L<Gearman::Driver::Console>

=over 4

=item * default: C<47300>

=item * isa: C<Int>

=back

Set this to C<0> to disable management console at all.

=cut

has 'console_port' => (
    default       => 47300,
    documentation => 'Port of management console (default: 47300)',
    is            => 'rw',
    isa           => 'Int',
    required      => 1,
);

=head2 interval

Each n seconds L<Net::Telnet::Gearman> is used in
L<Gearman::Driver::Observer> to check status of free/running/busy
workers on gearmand. This is used to fork more workers depending
on the queue size and the MinProcesses/MaxProcesses
L<attribute|Gearman::Driver::Worker/METHODATTRIBUTES> of the
job method. See also: L<Gearman::Driver::Worker>

=over 4

=item * default: C<5>

=item * isa: C<Int>

=back

=cut

has 'interval' => (
    default       => '5',
    documentation => 'Interval in seconds (see Gearman::Driver::Observer)',
    is            => 'rw',
    isa           => 'Int',
    required      => 1,
);

=head2 max_idle_time

Whenever L<Gearman::Driver::Observer> notices that there are more
processes running than actually necessary (depending on min_processes
and max_processes setting) it will kill them. By default this happens
immediately. If you change this value to C<300>, a process which is
not necessary is killed after 300 seconds.

Please remember that this also depends on what value you set
L</interval> to. The max_idle_time is only checked each n seconds
where n is L</interval>. Besides that it makes only sense when you
have workers where L<Gearman::Driver::Worker/MinProcesses> is set to
C<0>.

=over 4

=item * default: C<0>

=item * isa: C<Int>

=back

=cut

has 'max_idle_time' => (
    default       => '0',
    documentation => 'How many seconds a worker may be idle before its killed',
    is            => 'rw',
    isa           => 'Int',
    required      => 1,
);

=head2 logfile

Path to logfile.

=over 4

lib/Gearman/Driver.pm  view on Meta::CPAN

=item * isa: C<Str>

=item * default: C<[%d] %p %m%n>

=back

=cut

has 'loglayout' => (
    default       => '[%d] %p %m%n',
    documentation => 'Log message layout (default: [%d] %p %m%n)',
    is            => 'rw',
    isa           => 'Str',
);

=head2 loglevel

See also L<Log::Log4perl>.

=over 4

=item * isa: C<Str>

=item * default: C<INFO>

=back

=cut

has 'loglevel' => (
    default       => 'INFO',
    documentation => 'Log level (default: INFO)',
    is            => 'rw',
    isa           => 'Str',
);

=head2 unknown_job_callback

Whenever L<Gearman::Driver::Observer> sees a job that isnt handled
it will call this CodeRef, passing following arguments:

=over 4

=item * C<$driver>

=item * C<$status>

=back

    my $driver = Gearman::Driver->new(
        namespaces           => [qw(My::Workers)],
        unknown_job_callback => sub {
            my ( $driver, $status ) = @_;
            # notify nagios here for example
        }
    );

C<$status> might look like:

    $VAR1 = {
        'busy'    => 0,
        'free'    => 0,
        'name'    => 'GDExamples::Convert::unknown_job',
        'queue'   => 6,
        'running' => 0
    };

=cut

has 'unknown_job_callback' => (
    default => sub {
        sub { }
    },
    is     => 'rw',
    isa    => 'CodeRef',
    traits => [qw(NoGetopt)],
);

=head2 worker_options

You can pass runtime options to the worker module, these will merged with 'GLOBAL' and pass to the worker constructor. ( worker options override globals )

=over 4

=item * default: C<{}>

=item * isa: C<HashRef>

=back

Example:

    my $driver = Gearman::Driver->new(
        namespaces     => [qw(My::Workers)],
        worker_options => {
            'GLOBAL' => {
                'config' => $config,
            },
            'My::Workers::MysqlPing' => {
                'dsn' => 'DBI:mysql:database=test;host=localhost;mysql_auto_reconnect=1;mysql_enable_utf8=1;mysql_server_prepare=1;',
            },
            'My::Workers::ImageThumbnail' => {
                'default_format' => 'jpeg',
                'default_size => ' 133 x 100 ',
            }
        }
    );

You should define these in a runtime config (See also L</configfile>), might be:

    ---
    worker_options:
        'My::App::Worker::MysqlPing':
            'dsn': 'DBI:mysql:database=test;host=localhost;mysql_auto_reconnect=1;mysql_enable_utf8=1;mysql_server_prepare=1;'
            'user': 'root'
            'password:': ''
        'My::App::Worker::ImageThumbnail':
            'default_format': 'jpeg'
            'default_size': '133x100'

=cut

lib/Gearman/Driver.pm  view on Meta::CPAN

Returns the job instance.

=cut

sub BUILD {
    my ($self) = @_;
    $self->_setup_logger;
}

sub _setup_logger {
    my ($self) = @_;

    unless (Log::Log4perl->initialized()) {
        Log::Log4perl->easy_init(
            {
                file   => sprintf( '>>%s', $self->logfile ),
                layout => $self->loglayout,
                level  => $self->loglevel,
            },
        );
    }
}

sub _start_observer {
    my ($self) = @_;
    if ( $self->interval > 0 ) {
        $self->{observer} = Gearman::Driver::Observer->new(
            callback => sub {
                my ($response) = @_;
                $self->_observer_callback($response);
            },
            interval => $self->interval,
            server   => $self->server,
        );
    }
}

sub _start_console {
    my ($self) = @_;
    if ( $self->console_port > 0 ) {
        $self->{console} = Gearman::Driver::Console->new(
            driver => $self,
            port   => $self->console_port,
        );
    }
}

sub _observer_callback {
    my ( $self, $response ) = @_;

    # When $job->add_process is called and ProcessGroup is used
    # this may end up in a race condition and more processes than
    # wanted are started. To fix that we remember what kind of
    # processes we need to start in each single run of this callback.
    my %to_start = ();

    my $status = $response->{data};
    foreach my $row (@$status) {
        if ( my $job = $self->_find_job( $row->{name} ) ) {
            $to_start{$job->name} ||= 0;
            if ( $job->count_processes <= $row->{busy} && $row->{queue} ) {
                my $diff = $row->{queue} - $row->{busy};
                my $free = $job->max_processes - $job->count_processes;
                if ($free) {
                    my $start = $diff > $free ? $free : $diff;
                    $to_start{$job->name} += $start;
                }
            }

            elsif ( $job->count_processes && $job->count_processes > $job->min_processes && $row->{queue} == 0 ) {
                my $idle = time - $job->lastrun;
                if ( $job->lastrun && ($idle >= $self->max_idle_time) ) {
                    my $stop = $job->count_processes - $job->min_processes;
                    $self->log->debug( sprintf "Stopping %d process(es) of type %s (idle: %d)",
                        $stop, $job->name, $idle );
                    $job->remove_process for 1 .. $stop;
                }
            }
        }
        else {
            $self->unknown_job_callback->( $self, $row ) if $row->{queue} > 0;
        }
    }

    foreach my $name (keys %to_start) {
        my $job = $self->get_job($name);
        my $start = $to_start{$name};
        my $free = $job->max_processes - $job->count_processes;
        $start = $free if $start > $free;
        if ($start) {
            $self->log->debug( sprintf "Starting %d new process(es) of type %s", $start, $job->name );
            $job->add_process for 1 .. $start;
        }
    }

    my $error = $response->{error};
    foreach my $e (@$error) {
        $self->log->error( sprintf "Gearman::Driver::Observer: %s", $e );
    }
}

sub _find_job {
    my ( $self, $name ) = @_;
    foreach my $job ( $self->all_jobs ) {
        foreach my $method ( @{ $job->methods } ) {
            return $job if $method->name eq $name;
        }
    }
    return 0;
}

sub _start_session {
    my ($self) = @_;
    $self->{session} = POE::Session->create(
        object_states => [
            $self => {
                _start            => '_start',
                got_sig           => '_on_sig',
                monitor_processes => '_monitor_processes',
            }
        ]
    );



( run in 3.789 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )