App-madeye

 view release on metacpan or  search on metacpan

lib/App/MadEye/Plugin/Worker/Gearman.pm  view on Meta::CPAN

package App::MadEye::Plugin::Worker::Gearman;
use strict;
use warnings;
use base qw/App::MadEye::Plugin::Base/;
use Gearman::Worker;
use Gearman::Client;
use App::MadEye::Util;
use Params::Validate;
use English;
use App::MadEye::Util;
use POSIX ":sys_wait_h";
use Storable qw/freeze thaw/;
use YAML;
use Scalar::Util qw/weaken/;

__PACKAGE__->mk_accessors(qw/task_set child_pids gearman_client/);

our $CHILD_TIMEOUT = 60;  # TODO: configurable

sub new {
    my $class = shift;
    my $self = $class->SUPER::new(@_);

    my $gearman_client = $self->get_gearman_client;
    $self->gearman_client( $gearman_client );
    my $task_set = $gearman_client->new_task_set;
    $self->task_set( $task_set );

    $self;
}

sub before_run_jobs : Hook {
    my ($self, $context) = @_;

    my @child_pids = $self->_run_workers($context);
    $self->child_pids(\@child_pids);

    $SIG{__DIE__} = sub {
        $self->kill_workers();
    };
}

sub _run_workers {
    my ($self, $context) = @_;

    my $parent_pid = $PID;
    my @child_pids;
    for my $i ( 0 .. $self->config->{config}->{fork_num}- 1 ) {
        my $pid = fork();
        if ($pid) {
            # parent process
            push @child_pids, $pid;
        } elsif ( defined $pid ) {
            # child process
            $context->log('debug', "start worker $i($parent_pid)");
            $self->run_worker($context, $parent_pid);
        } else {
            die "Cannot fork: $!";
        }
    }
    return wantarray ? @child_pids : \@child_pids;
}

sub run_job :Method {
    my ($self, $context, $args) = @_;

    $self->task_set->add_task(
        'watch',
        freeze($args), +{
            timeout => $self->task_timeout(),
            on_fail => sub {
                warn "GEARMAN ERROR: " . Dump($args);
            },
            on_complete => sub {



( run in 2.561 seconds using v1.01-cache-2.11-cpan-99c4e6809bf )