AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
sub _number_of_reduces {
my $config = shift;
my $nserver = shift;
my $nr = $config->{reduces} + 0;
$nr ||= int $nserver * $REDUCEFACTOR;
$nr = 1 if $nr < 1;
return $nr;
}
sub _pick_reduce_bins {
my $nr = shift;
my $servers = shift;
my @redbin;
for my $bin (0 .. $nr-1){
$redbin[$bin][0] = $servers->[ $bin % @$servers ]->{id};
# pick alt location
next unless @$servers > 1;
$redbin[$bin][1] = $servers->[ ($bin + 1) % @$servers ]->{id};
}
shuffle(\@redbin);
return \@redbin;
}
sub _plan_map {
my $job = shift;
my $servers = shift;
my $files = shift;
my $nr = shift;
my $redbin = shift;
# plan map
# divy files among servers
# split server + files into tasks
my( $filemap, $copies ) = _plan_divy_files( $job, $files, $servers );
my @maptask;
for my $s (keys %$filemap){
my $totalsize = 0;
$totalsize += $_->{size} for @{$filemap->{$s}};;
my $sizelimit = $totalsize / $MAPTARGETMIN;
$sizelimit = $MAPSIZELIMIT if $sizelimit > $MAPSIZELIMIT;
my @todo = sort { $b->{size} <=> $a->{size} } @{$filemap->{$s}};
while( @todo ){
my @file;
my %alt;
my $tot;
while( @todo && ($tot < $sizelimit) ){
my $f = shift @todo;
$tot += $f->{size};
push @file, $f->{filename};
# backup plan?
my $as = $f->{location}[1];
$alt{$f->{filename}} = $as if $as;
}
my $id = unique();
push @maptask, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => 'map',
server => $s,
infile => \@file,
altplan => \%alt,
_total => $tot,
outfile => _plan_outfiles($job, $id, $nr, $redbin, 'map' ),
);
}
}
return (\@maptask, $copies);
}
sub _plan_reduce {
my $job = shift;
my $rno = shift;
my $nout = shift;
my $redbin = shift;
my $ptasks = shift;
my $jid = $job->{request}{jobid};
my @reds;
my $sn = 0;
for my $s (@$redbin){
my $id = unique();
push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => "reduce/$rno",
server => $s->[0],
altserver => $s->[1],
infile => [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
outfile => _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
);
$sn++;
}
return \@reds;
}
sub _plan_final {
my $job = shift;
my $redbin = shift;
my $ptasks = shift;
my $jid = $job->{request}{jobid};
my $id = unique();
return [
AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
server => $redbin->[0][0],
altserver => $redbin->[0][1],
( run in 2.377 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )