AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

    for my $s (@$redbin){
        my $id = unique();
        push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            phase	=> "reduce/$rno",
            server	=> $s->[0],
            altserver	=> $s->[1],
            infile	=> [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
            outfile	=> _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
        );
        $sn++;
    }

    return \@reds;
}

sub _plan_final {
    my $job     = shift;
    my $redbin  = shift;
    my $ptasks  = shift;

    my $jid = $job->{request}{jobid};

    my $id = unique();
    return [
        AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            server	=> $redbin->[0][0],
            altserver	=> $redbin->[0][1],
            phase	=> 'final',
            infile	=> [ map { $_->{outfile}[0]{filename} } @$ptasks ],
            outfile	=> [ ],
        ),
       ];
}

sub _plan_outfiles {
    my $job     = shift;
    my $taskid  = shift;
    my $nout    = shift;
    my $redbin  = shift;
    my $pfix    = shift;

    my @out;
    my $jid = $job->{request}{jobid};

    for my $n (0 .. $nout - 1){
        push @out, { filename => "mrtmp/j_$jid/${pfix}_${taskid}_$n", dst => [ @{$redbin->[$n]} ] };
    }

    return \@out;
}

sub _plan_map_these_servers {
    my $job     = shift;
    my $servers = shift;

    # limit number of servers?
    my $nm = ($job->{options}{maps} + 0) || @$servers;

    my %data;
    for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
        $data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
        $nm -- if $nm;
    }

    return \%data;
}

sub _plan_divy_files {
    my $job     = shift;
    my $files   = shift;
    my $servers = shift;

    my %filemap;
    my %bytes;
    my @copies;

    my $load = _plan_map_these_servers( $job, $servers );

    # divy files up among servers
    for my $f (sort { $b->{size} <=> $a->{size} } @$files){
        my($best_wgt, $best_loc);
        for my $loc ( @{$f->{location}} ){
            next unless exists $load->{$loc};	# down?
            next unless $load->{$loc}{use};
            my $w = (1 + $bytes{$loc}) * (1 + $load->{$loc}{metric});
            if( !$best_loc || $w < $best_wgt ){
                $best_wgt = $w;
                $best_loc = $loc;
            }
        }

        if( $best_loc ){
            # a server has the file. process it there.
            push @{$filemap{$best_loc}}, $f;
            $bytes{$best_loc} += $f->{size};
            next;
        }

        # pick best 2 servers
        my($sa, $sb) =
          map { $_->[1] }
          sort{ $a->[0] <=> $b->[0] }
          map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }
          grep { $load->{$_}{use} }
            (keys %$load);

        # copy the file
        my @loc = $sa;
        push @loc, $sb if $sb;
        my $newfile = "mrtmp/j_$job->{request}{jobid}/intmp_" . unique();
        debug("no active servers have file: $f->{filename}, copying to @loc => $newfile");

        my $ff = {
            filename	=> $newfile,
            location	=> \@loc,
            size	=> $f->{size},
        };
        push @{$filemap{$sa}}, $ff;
        $bytes{$sa} += $ff->{size};

        # need to copy this file from its current location to the server(s) that will run map on it
        for my $loc (@loc){
            push @copies, AC::MrGamoo::Job::XferInfo->new( $job,
                 id		=> unique(),
                 filename	=> $f->{filename},



( run in 0.457 second using v1.01-cache-2.11-cpan-39bf76dae61 )