App-madeye
view release on metacpan or search on metacpan
lib/App/MadEye/Plugin/Worker/Gearman.pm view on Meta::CPAN
package App::MadEye::Plugin::Worker::Gearman;
use strict;
use warnings;
use base qw/App::MadEye::Plugin::Base/;
use Gearman::Worker;
use Gearman::Client;
use App::MadEye::Util;
use Params::Validate;
use English;
use App::MadEye::Util;
use POSIX ":sys_wait_h";
use Storable qw/freeze thaw/;
use YAML;
use Scalar::Util qw/weaken/;
__PACKAGE__->mk_accessors(qw/task_set child_pids gearman_client/);
our $CHILD_TIMEOUT = 60; # TODO: configurable
sub new {
my $class = shift;
my $self = $class->SUPER::new(@_);
my $gearman_client = $self->get_gearman_client;
$self->gearman_client( $gearman_client );
my $task_set = $gearman_client->new_task_set;
$self->task_set( $task_set );
$self;
}
sub before_run_jobs : Hook {
my ($self, $context) = @_;
my @child_pids = $self->_run_workers($context);
$self->child_pids(\@child_pids);
$SIG{__DIE__} = sub {
$self->kill_workers();
};
}
sub _run_workers {
my ($self, $context) = @_;
my $parent_pid = $PID;
my @child_pids;
for my $i ( 0 .. $self->config->{config}->{fork_num}- 1 ) {
my $pid = fork();
if ($pid) {
# parent process
push @child_pids, $pid;
} elsif ( defined $pid ) {
# child process
$context->log('debug', "start worker $i($parent_pid)");
$self->run_worker($context, $parent_pid);
} else {
die "Cannot fork: $!";
}
}
return wantarray ? @child_pids : \@child_pids;
}
sub run_job :Method {
my ($self, $context, $args) = @_;
$self->task_set->add_task(
'watch',
freeze($args), +{
timeout => $self->task_timeout(),
on_fail => sub {
warn "GEARMAN ERROR: " . Dump($args);
},
on_complete => sub {
( run in 2.561 seconds using v1.01-cache-2.11-cpan-99c4e6809bf )