AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN


    # partially compile
    eval {
        $me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
    };
    if(my $e = $@){
        problem("cannot compile job: $e");
        return;
    }

    # RSN - get_file_list + Plan may take too long - do in sub-process

    # get file list
    my $files = get_file_list( $cf );
    #print STDERR "files: ", dumper($files), "\n";

    for my $f (@$files){
        $me->{file_info}{ $f->{filename} } = $f;
    }

    # get server list
    my $servers = get_peer_list( $cf );
    #print STDERR "servers: ", dumper($servers), "\n";

    # plan job
    my $plan = AC::MrGamoo::Job::Plan->new( $me, $servers, $files );
    #print STDERR "plan: ", dumper($plan), "\n";

    $me->{plan} = $plan;

    $me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});

    $me->{server_info}{$_->{id}} = {} for @$servers;

    $me->_preload_file_copies();
    $REGISTRY{ $me->{request}{jobid} } = $me;
    return $me;
}

sub start {
    my $me = shift;

    debug("start job");
    $me->{euconsole}->send_msg('debug', 'starting job');
    $me->_try_to_do_something();
    1;
}

################################################################

# record status rcvd from task
sub task_status {
    my $me = _find(shift, @_);
    my %p  = @_;
    my $taskid = $p{taskid};

    return unless $me;
    my $t = $me->{task_running}{$taskid};
    return unless $t;

    $t->update_status( $me, $p{phase}, $p{progress} );

    1;
}

# record status rcvd from file xfer
sub xfer_status {
    my $me = _find(shift, @_);
    my %p  = @_;
    my $copyid = $p{copyid};

    return unless $me;
    my $c = $me->{xfer_running}{$copyid};
    return unless $c;

    $c->update_status( $me, $p{status_code} );

    1;
}

################################################################

sub periodic {
    # debug("periodic check");

    $_trying = 0;
    for my $job (values %REGISTRY){
        $job->_periodic();
    }
}

sub _periodic {
    my $me = shift;

    my @t = values %{$me->{task_running}};
    for my $t ( @t ){
        my $lt = $t->{status_time} || $t->{start_time};
        next if $^T - $lt < ($me->{options}{tasktimeout} || $TASKTIMEOUT);

        $t->failed( $me, 'timeout' );
    }

    my @c = values %{$me->{xfer_running}};
    for my $c ( @c ){
        my $lt = $c->{status_time} || $c->{start_time};
        next if $^T - $lt < ($me->{options}{xfertimeout} || $XFERTIMEOUT);

        $c->failed( $me, 'timeout' );
    }

    $me->_try_to_do_something();

    my $tr = keys %{ $me->{task_running} };
    my $tp = keys %{ $me->{task_pending} };
    my $xr = keys %{ $me->{xfer_running} };
    my $xp = keys %{ $me->{xfer_pending} };
    my $rr = keys %{ $me->{request_running} };
    my $rp = keys %{ $me->{request_pending} };

    my $ph = $me->{plan}{phases}[ $me->{phase_no} ] || 'none';



( run in 1.442 second using v1.01-cache-2.11-cpan-fe3c2283af0 )