AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN


our %REGISTRY;
our $MSGID = $$;
my $_trying;

our $MAXFILE = `sh -c "ulimit -n"`;
$MAXFILE = 255 if $^O eq 'solaris' && $MAXFILE > 255;

################################################################

# schedule periodic "cronjob"
AC::DC::Sched->new(
    info	=> "job periodic",
    freq	=> 2,
    func	=> \&periodic,
   );

################################################################

sub new {
    my $class = shift;
    # %{ APCMRMJobCreate }

    my $me = bless {
        request		=> { @_ },
        phase_no	=> -1,
        file_info	=> {},
        tmp_file	=> [],
        server_info	=> {},
        task_running	=> {},
        task_pending	=> {},
        xfer_running	=> {},
        xfer_pending	=> {},
        request_running	=> {},
        request_pending	=> {},
        statistics	=> { job_start => time() },
    }, $class;

    if( $REGISTRY{ $me->{request}{jobid} } ){
        verbose("ignoring duplicate request job $me->{request}{jobid}");
        # will cause a 200 OK, so the requestor will not retry
        return $REGISTRY{ $me->{request}{jobid} };
    }

    verbose("new job: $me->{request}{jobid} ($me->{request}{traceinfo})");

    my $cf = $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};

    # open connection  to eu-console
    $me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );

    # partially compile
    eval {
        $me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
    };
    if(my $e = $@){
        problem("cannot compile job: $e");
        return;
    }

    # RSN - get_file_list + Plan may take too long - do in sub-process

    # get file list
    my $files = get_file_list( $cf );
    #print STDERR "files: ", dumper($files), "\n";

    for my $f (@$files){
        $me->{file_info}{ $f->{filename} } = $f;
    }

    # get server list
    my $servers = get_peer_list( $cf );
    #print STDERR "servers: ", dumper($servers), "\n";

    # plan job
    my $plan = AC::MrGamoo::Job::Plan->new( $me, $servers, $files );
    #print STDERR "plan: ", dumper($plan), "\n";

    $me->{plan} = $plan;

    $me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});

    $me->{server_info}{$_->{id}} = {} for @$servers;

    $me->_preload_file_copies();
    $REGISTRY{ $me->{request}{jobid} } = $me;
    return $me;
}

sub start {
    my $me = shift;

    debug("start job");
    $me->{euconsole}->send_msg('debug', 'starting job');
    $me->_try_to_do_something();
    1;
}

################################################################

# record status rcvd from task
sub task_status {
    my $me = _find(shift, @_);
    my %p  = @_;
    my $taskid = $p{taskid};

    return unless $me;
    my $t = $me->{task_running}{$taskid};
    return unless $t;

    $t->update_status( $me, $p{phase}, $p{progress} );

    1;
}

# record status rcvd from file xfer
sub xfer_status {
    my $me = _find(shift, @_);
    my %p  = @_;
    my $copyid = $p{copyid};



( run in 0.589 second using v1.01-cache-2.11-cpan-5837b0d9d2c )