AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
# partially compile
eval {
$me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
};
if(my $e = $@){
problem("cannot compile job: $e");
return;
}
# RSN - get_file_list + Plan may take too long - do in sub-process
# get file list
my $files = get_file_list( $cf );
#print STDERR "files: ", dumper($files), "\n";
for my $f (@$files){
$me->{file_info}{ $f->{filename} } = $f;
}
# get server list
my $servers = get_peer_list( $cf );
#print STDERR "servers: ", dumper($servers), "\n";
# plan job
my $plan = AC::MrGamoo::Job::Plan->new( $me, $servers, $files );
#print STDERR "plan: ", dumper($plan), "\n";
$me->{plan} = $plan;
$me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});
$me->{server_info}{$_->{id}} = {} for @$servers;
$me->_preload_file_copies();
$REGISTRY{ $me->{request}{jobid} } = $me;
return $me;
}
sub start {
my $me = shift;
debug("start job");
$me->{euconsole}->send_msg('debug', 'starting job');
$me->_try_to_do_something();
1;
}
################################################################
# record status rcvd from task
sub task_status {
my $me = _find(shift, @_);
my %p = @_;
my $taskid = $p{taskid};
return unless $me;
my $t = $me->{task_running}{$taskid};
return unless $t;
$t->update_status( $me, $p{phase}, $p{progress} );
1;
}
# record status rcvd from file xfer
sub xfer_status {
my $me = _find(shift, @_);
my %p = @_;
my $copyid = $p{copyid};
return unless $me;
my $c = $me->{xfer_running}{$copyid};
return unless $c;
$c->update_status( $me, $p{status_code} );
1;
}
################################################################
sub periodic {
# debug("periodic check");
$_trying = 0;
for my $job (values %REGISTRY){
$job->_periodic();
}
}
sub _periodic {
my $me = shift;
my @t = values %{$me->{task_running}};
for my $t ( @t ){
my $lt = $t->{status_time} || $t->{start_time};
next if $^T - $lt < ($me->{options}{tasktimeout} || $TASKTIMEOUT);
$t->failed( $me, 'timeout' );
}
my @c = values %{$me->{xfer_running}};
for my $c ( @c ){
my $lt = $c->{status_time} || $c->{start_time};
next if $^T - $lt < ($me->{options}{xfertimeout} || $XFERTIMEOUT);
$c->failed( $me, 'timeout' );
}
$me->_try_to_do_something();
my $tr = keys %{ $me->{task_running} };
my $tp = keys %{ $me->{task_pending} };
my $xr = keys %{ $me->{xfer_running} };
my $xp = keys %{ $me->{xfer_pending} };
my $rr = keys %{ $me->{request_running} };
my $rp = keys %{ $me->{request_pending} };
my $ph = $me->{plan}{phases}[ $me->{phase_no} ] || 'none';
( run in 1.442 second using v1.01-cache-2.11-cpan-fe3c2283af0 )