AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
# add to copy_pending
foreach my $s ( @{$fi->{dst}} ){
next if $job->{server_info}{$s}{has_files}{$fi->{filename}};
my $c = AC::MrGamoo::Job::XferInfo->new( $job,
id => unique(),
filename => $fi->{filename},
dst => $s,
);
next unless $c;
$c->pend($job);
debug(" => pending copy for $s");
}
}
}
sub failed {
my $me = shift;
my $t = shift;
my $job = shift;
delete $me->{instance}{ $t->{id} };
my $server = $me->{server};
my $status = get_peer_status_from_id($server);
if( $status != 200 ){
# replan tasks
$job->_replan_server($server, 'task', $me);
return;
}
if( $me->{retries} ++ > $MAXRETRY ){
# replan tasks
$me->replan($job);
return;
}
# retry
debug("retry task");
$me->pend($job);
}
################################################################
sub replan {
my $me = shift;
my $job = shift;
return if $me->{replaced};
return $job->abort( reason => "too many failed tasks. out of replan options.")
if $me->{replaces};
return $me->_replan_altserver($job) if $me->{altserver};
if( $me->{phase} eq 'reduce' ){
verbose("cannot replan task. no altserver");
$job->abort(reason => "cannot replan task. no alternate server available");
return;
}
$me->_replan_map($job);
}
sub _replan_altserver {
my $me = shift;
my $job = shift;
$me->{server} = $me->{altserver};
delete $me->{retries};
delete $me->{altserver};
debug("replanning task to new server");
$me->pend($job);
}
sub _replan_map {
my $me = shift;
my $job = shift;
# remove task
# divy files among servers
# create new tasks
# rediddle next phase
my %newplan; # server => @files
$me->{replaced} = 1;
unless( $me->{altplan} ){
verbose("no alt task available - aborting");
$job->abort(reason => "cannot replan task. no alternate available");
return;
}
# divy files
for my $f (@{$me->{infile}}){
# alt loc for this file?
my $loc = $me->{altplan}{$f};
unless($loc){
verbose("file unavailable - aborting ($f)");
$job->abort(reason => "file unavailable: $f");
return;
}
push @{$newplan{$loc}}, $f;
}
my @new;
for my $as (keys %newplan){
my $newid = unique();
my $oldid = $me->{id};
my $new = AC::MrGamoo::Job::TaskInfo->new($job,
id => $newid,
phase => $me->{phase},
infile => $newplan{$as},
replaces => $oldid,
outfile => [ map {
(my $f = $_->{filename}) =~ s/$oldid/$newid/;
{ dst => $_->{dst}, filename => $f, }
} @{$me->{outfile}} ],
server => $as,
);
debug("replan map $oldid => $newid on $as");
# keep plan up to date
$job->{plan}{taskidx}{$newid} = $new;
push @{$job->{plan}{taskplan}[0]{task}}, $new;
# move to pending queue
$new->pend($job) if $job->{phase_no} == 0;
push @new, $new;
}
$me->_replan_replace_files( $job, @new );
}
sub _replan_replace_files {
my $me = shift;
my $job = shift;
my @new = shift;
my $oldid = $me->{id};
my $curphase = 0; # map
my $nxtphase = 1; # reduce/0
# remove old task's files, add new tasks' files
for my $ti ( @{$job->{plan}{taskplan}[$nxtphase]{task}} ){
my @infile;
for my $file (@{$ti->{infile}}){
if( $file =~ /$oldid/ ){
for my $new (@new){
my $newid = $new->{id};
(my $n = $file) =~ s/$oldid/$newid/;
push @infile, $n;
}
}else{
push @infile, $file;
}
}
$ti->{infile} = \@infile;
}
}
1;
( run in 1.102 second using v1.01-cache-2.11-cpan-f56aa216473 )