Gearman-Driver
view release on metacpan or search on metacpan
lib/Gearman/Driver.pm view on Meta::CPAN
See also: L<prefix|Gearman::Driver::Worker/prefix>
=head1 ATTRIBUTES
See also L<Gearman::Driver::Loader/ATTRIBUTES>.
=head2 server
A list of Gearman servers the workers should connect to. The format
for the server list is: C<host[:port][,host[:port]]>
See also: L<Gearman::XS>
=over 4
=item * default: C<localhost:4730>
=item * isa: C<Str>
=back
=cut
has 'server' => (
default => 'localhost:4730',
documentation => 'Gearman host[:port][,host[:port]]',
is => 'rw',
isa => 'Str',
required => 1,
);
=head2 console_port
Gearman::Driver has a telnet management console, see also:
L<Gearman::Driver::Console>
=over 4
=item * default: C<47300>
=item * isa: C<Int>
=back
Set this to C<0> to disable management console at all.
=cut
has 'console_port' => (
default => 47300,
documentation => 'Port of management console (default: 47300)',
is => 'rw',
isa => 'Int',
required => 1,
);
=head2 interval
Each n seconds L<Net::Telnet::Gearman> is used in
L<Gearman::Driver::Observer> to check status of free/running/busy
workers on gearmand. This is used to fork more workers depending
on the queue size and the MinProcesses/MaxProcesses
L<attribute|Gearman::Driver::Worker/METHODATTRIBUTES> of the
job method. See also: L<Gearman::Driver::Worker>
=over 4
=item * default: C<5>
=item * isa: C<Int>
=back
=cut
has 'interval' => (
default => '5',
documentation => 'Interval in seconds (see Gearman::Driver::Observer)',
is => 'rw',
isa => 'Int',
required => 1,
);
=head2 max_idle_time
Whenever L<Gearman::Driver::Observer> notices that there are more
processes running than actually necessary (depending on min_processes
and max_processes setting) it will kill them. By default this happens
immediately. If you change this value to C<300>, a process which is
not necessary is killed after 300 seconds.
Please remember that this also depends on what value you set
L</interval> to. The max_idle_time is only checked each n seconds
where n is L</interval>. Besides that it makes only sense when you
have workers where L<Gearman::Driver::Worker/MinProcesses> is set to
C<0>.
=over 4
=item * default: C<0>
=item * isa: C<Int>
=back
=cut
has 'max_idle_time' => (
default => '0',
documentation => 'How many seconds a worker may be idle before its killed',
is => 'rw',
isa => 'Int',
required => 1,
);
=head2 logfile
Path to logfile.
=over 4
lib/Gearman/Driver.pm view on Meta::CPAN
=item * isa: C<Str>
=item * default: C<[%d] %p %m%n>
=back
=cut
has 'loglayout' => (
default => '[%d] %p %m%n',
documentation => 'Log message layout (default: [%d] %p %m%n)',
is => 'rw',
isa => 'Str',
);
=head2 loglevel
See also L<Log::Log4perl>.
=over 4
=item * isa: C<Str>
=item * default: C<INFO>
=back
=cut
has 'loglevel' => (
default => 'INFO',
documentation => 'Log level (default: INFO)',
is => 'rw',
isa => 'Str',
);
=head2 unknown_job_callback
Whenever L<Gearman::Driver::Observer> sees a job that isnt handled
it will call this CodeRef, passing following arguments:
=over 4
=item * C<$driver>
=item * C<$status>
=back
my $driver = Gearman::Driver->new(
namespaces => [qw(My::Workers)],
unknown_job_callback => sub {
my ( $driver, $status ) = @_;
# notify nagios here for example
}
);
C<$status> might look like:
$VAR1 = {
'busy' => 0,
'free' => 0,
'name' => 'GDExamples::Convert::unknown_job',
'queue' => 6,
'running' => 0
};
=cut
has 'unknown_job_callback' => (
default => sub {
sub { }
},
is => 'rw',
isa => 'CodeRef',
traits => [qw(NoGetopt)],
);
=head2 worker_options
You can pass runtime options to the worker module, these will merged with 'GLOBAL' and pass to the worker constructor. ( worker options override globals )
=over 4
=item * default: C<{}>
=item * isa: C<HashRef>
=back
Example:
my $driver = Gearman::Driver->new(
namespaces => [qw(My::Workers)],
worker_options => {
'GLOBAL' => {
'config' => $config,
},
'My::Workers::MysqlPing' => {
'dsn' => 'DBI:mysql:database=test;host=localhost;mysql_auto_reconnect=1;mysql_enable_utf8=1;mysql_server_prepare=1;',
},
'My::Workers::ImageThumbnail' => {
'default_format' => 'jpeg',
'default_size => ' 133 x 100 ',
}
}
);
You should define these in a runtime config (See also L</configfile>), might be:
---
worker_options:
'My::App::Worker::MysqlPing':
'dsn': 'DBI:mysql:database=test;host=localhost;mysql_auto_reconnect=1;mysql_enable_utf8=1;mysql_server_prepare=1;'
'user': 'root'
'password:': ''
'My::App::Worker::ImageThumbnail':
'default_format': 'jpeg'
'default_size': '133x100'
=cut
lib/Gearman/Driver.pm view on Meta::CPAN
Returns the job instance.
=cut
sub BUILD {
my ($self) = @_;
$self->_setup_logger;
}
sub _setup_logger {
my ($self) = @_;
unless (Log::Log4perl->initialized()) {
Log::Log4perl->easy_init(
{
file => sprintf( '>>%s', $self->logfile ),
layout => $self->loglayout,
level => $self->loglevel,
},
);
}
}
sub _start_observer {
my ($self) = @_;
if ( $self->interval > 0 ) {
$self->{observer} = Gearman::Driver::Observer->new(
callback => sub {
my ($response) = @_;
$self->_observer_callback($response);
},
interval => $self->interval,
server => $self->server,
);
}
}
sub _start_console {
my ($self) = @_;
if ( $self->console_port > 0 ) {
$self->{console} = Gearman::Driver::Console->new(
driver => $self,
port => $self->console_port,
);
}
}
sub _observer_callback {
my ( $self, $response ) = @_;
# When $job->add_process is called and ProcessGroup is used
# this may end up in a race condition and more processes than
# wanted are started. To fix that we remember what kind of
# processes we need to start in each single run of this callback.
my %to_start = ();
my $status = $response->{data};
foreach my $row (@$status) {
if ( my $job = $self->_find_job( $row->{name} ) ) {
$to_start{$job->name} ||= 0;
if ( $job->count_processes <= $row->{busy} && $row->{queue} ) {
my $diff = $row->{queue} - $row->{busy};
my $free = $job->max_processes - $job->count_processes;
if ($free) {
my $start = $diff > $free ? $free : $diff;
$to_start{$job->name} += $start;
}
}
elsif ( $job->count_processes && $job->count_processes > $job->min_processes && $row->{queue} == 0 ) {
my $idle = time - $job->lastrun;
if ( $job->lastrun && ($idle >= $self->max_idle_time) ) {
my $stop = $job->count_processes - $job->min_processes;
$self->log->debug( sprintf "Stopping %d process(es) of type %s (idle: %d)",
$stop, $job->name, $idle );
$job->remove_process for 1 .. $stop;
}
}
}
else {
$self->unknown_job_callback->( $self, $row ) if $row->{queue} > 0;
}
}
foreach my $name (keys %to_start) {
my $job = $self->get_job($name);
my $start = $to_start{$name};
my $free = $job->max_processes - $job->count_processes;
$start = $free if $start > $free;
if ($start) {
$self->log->debug( sprintf "Starting %d new process(es) of type %s", $start, $job->name );
$job->add_process for 1 .. $start;
}
}
my $error = $response->{error};
foreach my $e (@$error) {
$self->log->error( sprintf "Gearman::Driver::Observer: %s", $e );
}
}
sub _find_job {
my ( $self, $name ) = @_;
foreach my $job ( $self->all_jobs ) {
foreach my $method ( @{ $job->methods } ) {
return $job if $method->name eq $name;
}
}
return 0;
}
sub _start_session {
my ($self) = @_;
$self->{session} = POE::Session->create(
object_states => [
$self => {
_start => '_start',
got_sig => '_on_sig',
monitor_processes => '_monitor_processes',
}
]
);
( run in 3.789 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )