AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Apr-22 11:12 (EDT)
# Function: remote tasks
#
# $Id: Task.pm,v 1.2 2011/01/14 22:38:06 jaw Exp $

package AC::MrGamoo::Job::Task;
use AC::MrGamoo::Debug 'job_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::MySelf;
use AC::Misc;
use Time::HiRes 'time';
use strict;

our @ISA = 'AC::MrGamoo::Job::Action';


sub new {
    my $class  = shift;
    my $job    = shift;
    my $info   = shift;
    my $server = shift;

    my $id = unique();

    my $me = bless {
        id	=> $id,
        info	=> $info,
        server	=> $server,
        created => time(),
    };

    $job->{task_pending}{$id} = $me;

    debug("  => pending task $info->{id} => $id on $server");

    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    my $server = $me->{server};
    debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");

    # send request to server
    my $ti = $me->{info};

    my $x = $job->_send_request( $server, "task $me->{id}", {
        type		=> 'mrgamoo_taskcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        taskid		=> $me->{id},
        jobsrc		=> $job->{request}{jobsrc},
        options		=> $job->{request}{options},
        initres		=> $job->{request}{initres},
        console		=> ($job->{request}{console} || ''),
        phase		=> $ti->{phase},
        infile		=> $ti->{infile},
        outfile		=> [ map { $_->{filename} } @{$ti->{outfile}} ],
        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start task");
        $me->failed($job);
        return;
    }

    # no success cb here. we will either timeout, or get a TaskStatus msg.
    $x->set_callback('on_failure', \&_cb_start_task_fail, $me, $job );

    $me->started($job, 'task');
    $x->start();
}

sub _cb_start_task_fail {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;

    $me->failed($job, 'network');
}

sub update_status {
    my $me  = shift;
    my $job = shift;
    my $phase = shift;
    my $progress = shift;

    $me->{status_time}  = $^T;
    $me->{status_phase} = $phase;
    $me->{status_amt}   = $progress;
    $me->{status_fail}  = 1 if $phase eq 'FAILED';

    debug("task is $phase $progress");

    if( $phase eq 'FINISHED' ){
        if( $me->{status_fail} ){
            $me->failed( $job, "status fail" );
        }else{
            $me->finished( $job );
        }
    }

    return 1;
}

sub failed {
    my $me   = shift;
    my $job  = shift;
    my $why  = shift;

    debug("task failed: $why $me->{status_time}");

    return if $job->something_failed();
    $me->SUPER::failed($job, 'task');
    $me->{info}->failed( $me, $job );
    if( $why eq 'timeout' ){
        $me->abort($job)
    }else{
        # $job->_try_to_do_something();
    }
}

sub finished {
    my $me   = shift;
    my $job  = shift;
    my $why  = shift;

    debug('task finish');

    $me->SUPER::finished($job, 'task');
    $me->{info}->finished( $me, $job );

    $job->_try_to_do_something();
}

sub abort {
    my $me  = shift;
    my $job = shift;

    debug("aborting task $me->{id}");

    AC::MrGamoo::Job::Request->new( $job,
        id	=> unique(),
        server	=> $me->{server},
        info	=> "abort task $me->{id}",
        proto	=> {
            type		=> 'mrgamoo_taskabort',
            msgidno		=> $^T,
            want_reply		=> 1,
        },
        request	=> {
            jobid		=> $job->{request}{jobid},
            taskid		=> $me->{id},
        },
    );

    delete $job->{"task_running"}{$me->{id}};
    delete $job->{server_info}{$me->{server}}{"task_running"}{$me->{id}};
}



1;



( run in 0.469 second using v1.01-cache-2.11-cpan-39bf76dae61 )