AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
for my $s (@$redbin){
my $id = unique();
push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => "reduce/$rno",
server => $s->[0],
altserver => $s->[1],
infile => [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
outfile => _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
);
$sn++;
}
return \@reds;
}
sub _plan_final {
my $job = shift;
my $redbin = shift;
my $ptasks = shift;
my $jid = $job->{request}{jobid};
my $id = unique();
return [
AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
server => $redbin->[0][0],
altserver => $redbin->[0][1],
phase => 'final',
infile => [ map { $_->{outfile}[0]{filename} } @$ptasks ],
outfile => [ ],
),
];
}
sub _plan_outfiles {
my $job = shift;
my $taskid = shift;
my $nout = shift;
my $redbin = shift;
my $pfix = shift;
my @out;
my $jid = $job->{request}{jobid};
for my $n (0 .. $nout - 1){
push @out, { filename => "mrtmp/j_$jid/${pfix}_${taskid}_$n", dst => [ @{$redbin->[$n]} ] };
}
return \@out;
}
sub _plan_map_these_servers {
my $job = shift;
my $servers = shift;
# limit number of servers?
my $nm = ($job->{options}{maps} + 0) || @$servers;
my %data;
for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
$data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
$nm -- if $nm;
}
return \%data;
}
sub _plan_divy_files {
my $job = shift;
my $files = shift;
my $servers = shift;
my %filemap;
my %bytes;
my @copies;
my $load = _plan_map_these_servers( $job, $servers );
# divy files up among servers
for my $f (sort { $b->{size} <=> $a->{size} } @$files){
my($best_wgt, $best_loc);
for my $loc ( @{$f->{location}} ){
next unless exists $load->{$loc}; # down?
next unless $load->{$loc}{use};
my $w = (1 + $bytes{$loc}) * (1 + $load->{$loc}{metric});
if( !$best_loc || $w < $best_wgt ){
$best_wgt = $w;
$best_loc = $loc;
}
}
if( $best_loc ){
# a server has the file. process it there.
push @{$filemap{$best_loc}}, $f;
$bytes{$best_loc} += $f->{size};
next;
}
# pick best 2 servers
my($sa, $sb) =
map { $_->[1] }
sort{ $a->[0] <=> $b->[0] }
map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }
grep { $load->{$_}{use} }
(keys %$load);
# copy the file
my @loc = $sa;
push @loc, $sb if $sb;
my $newfile = "mrtmp/j_$job->{request}{jobid}/intmp_" . unique();
debug("no active servers have file: $f->{filename}, copying to @loc => $newfile");
my $ff = {
filename => $newfile,
location => \@loc,
size => $f->{size},
};
push @{$filemap{$sa}}, $ff;
$bytes{$sa} += $ff->{size};
# need to copy this file from its current location to the server(s) that will run map on it
for my $loc (@loc){
push @copies, AC::MrGamoo::Job::XferInfo->new( $job,
id => unique(),
filename => $f->{filename},
( run in 0.457 second using v1.01-cache-2.11-cpan-39bf76dae61 )