AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:35 (EST)
# Function: 
#
# $Id: Plan.pm,v 1.1 2010/11/01 18:41:56 jaw Exp $

package AC::MrGamoo::Job::Plan;
use AC::MrGamoo::Debug 'plan';
use AC::Misc;

use strict;

my $REDUCEFACTOR = 1.9;		# QQQ - config?
my $MAPTARGETMIN = 8;		# try to have at least this many maps/server
my $MAPSIZELIMIT = 100_000_000;

sub new {
    my $class   = shift;
    my $job     = shift;
    my $servers = shift;
    my $files   = shift;

    return unless @$servers;

    # how many reduces?
    my $nr = _number_of_reduces( $job->{options}, scalar @$servers );

    # map servers to reduce bins
    my $redbin = _pick_reduce_bins( $nr, $servers );

    # plan out the map phase
    my @phase = 'map';
    my($planmap, $plancopy) = _plan_map( $job, $servers, $files, $nr, $redbin );
    my @task  = { phase => 'map', task => $planmap };

    # plan out the reduce phases
    my $nrp = @{$job->{mr}{content}{reduce}};
    for my $r (0 .. $nrp - 1){
        push @phase, "reduce/$r";
        # last reduce has 1 outfile, otherwise nr.
        my $nout = ($r == $nrp - 1) ? 1 : $nr;
        push @task,  { phase => "reduce/$r", task => _plan_reduce($job, $r, $nout, $redbin, $task[-1]{task}) };
    }

    # plan out a final phase
    if( $job->{mr}{content}{final} ){
        push @phase, 'final';
        push @task,  { phase => 'final', task => _plan_final($job, $redbin, $task[-1]{task}) };
    }

    # summary
    my %task;
    for my $ts (@task){
        for my $t ( @{$ts->{task}} ){
            $task{ $t->{id} } = $t;
        }
    }

    # debug("plan: " . dumper( \@task ));

    debug("infiles: " . @$files . ", precopy: " . @$plancopy . ", maps: " . @$planmap . ", reduces: $nr x $nrp");

    return bless {
        nserver		=> scalar(@$servers),
        nreduce		=> $nr,
        copying		=> $plancopy,
        phases		=> \@phase,
        taskplan	=> \@task,
        redbin		=> $redbin,
        taskidx		=> \%task,
    }, $class;
}

sub _number_of_reduces {
    my $config  = shift;
    my $nserver = shift;

    my $nr = $config->{reduces} + 0;
    $nr ||= int $nserver * $REDUCEFACTOR;
    $nr = 1 if $nr < 1;

    return $nr;
}

sub _pick_reduce_bins {
    my $nr      = shift;
    my $servers = shift;


    my @redbin;
    for my $bin (0 .. $nr-1){
        $redbin[$bin][0] = $servers->[ $bin % @$servers ]->{id};

        # pick alt location
        next unless @$servers > 1;
        $redbin[$bin][1] = $servers->[ ($bin + 1) % @$servers ]->{id};
    }
    shuffle(\@redbin);

    return \@redbin;
}

sub _plan_map {
    my $job     = shift;
    my $servers = shift;
    my $files   = shift;
    my $nr      = shift;
    my $redbin  = shift;

    # plan map
    #  divy files among servers
    #  split server + files into tasks

    my( $filemap, $copies ) = _plan_divy_files( $job, $files, $servers );

    my @maptask;
    for my $s (keys %$filemap){
        my $totalsize = 0;
        $totalsize += $_->{size} for @{$filemap->{$s}};;
        my $sizelimit = $totalsize / $MAPTARGETMIN;
        $sizelimit = $MAPSIZELIMIT if $sizelimit > $MAPSIZELIMIT;

        my @todo = sort { $b->{size} <=> $a->{size} } @{$filemap->{$s}};
        while( @todo ){
            my @file;
            my %alt;
            my $tot;

            while( @todo && ($tot < $sizelimit) ){
                my $f = shift @todo;
                $tot += $f->{size};
                push @file, $f->{filename};
                # backup plan?
                my $as = $f->{location}[1];
                $alt{$f->{filename}} = $as if $as;
            }

            my $id = unique();
            push @maptask, AC::MrGamoo::Job::TaskInfo->new( $job,
                id	=> $id,
                phase	=> 'map',
                server  => $s,
                infile  => \@file,
                altplan	=> \%alt,
                _total  => $tot,
                outfile => _plan_outfiles($job, $id, $nr, $redbin, 'map' ),
            );
        }
    }

    return (\@maptask, $copies);
}

sub _plan_reduce {
    my $job     = shift;
    my $rno     = shift;
    my $nout    = shift;
    my $redbin  = shift;
    my $ptasks  = shift;

    my $jid = $job->{request}{jobid};

    my @reds;
    my $sn = 0;
    for my $s (@$redbin){
        my $id = unique();
        push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            phase	=> "reduce/$rno",
            server	=> $s->[0],
            altserver	=> $s->[1],
            infile	=> [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
            outfile	=> _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
        );
        $sn++;
    }

    return \@reds;
}

sub _plan_final {
    my $job     = shift;
    my $redbin  = shift;
    my $ptasks  = shift;

    my $jid = $job->{request}{jobid};

    my $id = unique();
    return [
        AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            server	=> $redbin->[0][0],
            altserver	=> $redbin->[0][1],
            phase	=> 'final',
            infile	=> [ map { $_->{outfile}[0]{filename} } @$ptasks ],
            outfile	=> [ ],
        ),
       ];
}

sub _plan_outfiles {
    my $job     = shift;
    my $taskid  = shift;
    my $nout    = shift;
    my $redbin  = shift;
    my $pfix    = shift;

    my @out;
    my $jid = $job->{request}{jobid};

    for my $n (0 .. $nout - 1){
        push @out, { filename => "mrtmp/j_$jid/${pfix}_${taskid}_$n", dst => [ @{$redbin->[$n]} ] };
    }

    return \@out;
}

sub _plan_map_these_servers {
    my $job     = shift;
    my $servers = shift;

    # limit number of servers?
    my $nm = ($job->{options}{maps} + 0) || @$servers;

    my %data;
    for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
        $data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
        $nm -- if $nm;
    }

    return \%data;
}

sub _plan_divy_files {
    my $job     = shift;
    my $files   = shift;
    my $servers = shift;

    my %filemap;
    my %bytes;
    my @copies;

    my $load = _plan_map_these_servers( $job, $servers );

    # divy files up among servers
    for my $f (sort { $b->{size} <=> $a->{size} } @$files){
        my($best_wgt, $best_loc);
        for my $loc ( @{$f->{location}} ){
            next unless exists $load->{$loc};	# down?
            next unless $load->{$loc}{use};
            my $w = (1 + $bytes{$loc}) * (1 + $load->{$loc}{metric});
            if( !$best_loc || $w < $best_wgt ){
                $best_wgt = $w;
                $best_loc = $loc;
            }
        }

        if( $best_loc ){
            # a server has the file. process it there.
            push @{$filemap{$best_loc}}, $f;
            $bytes{$best_loc} += $f->{size};

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 1.605 second using v1.00-cache-2.02-grep-82fe00e-cpan-f5108d614456 )