AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN


    # plan out the map phase
    my @phase = 'map';
    my($planmap, $plancopy) = _plan_map( $job, $servers, $files, $nr, $redbin );
    my @task  = { phase => 'map', task => $planmap };

    # plan out the reduce phases
    my $nrp = @{$job->{mr}{content}{reduce}};
    for my $r (0 .. $nrp - 1){
        push @phase, "reduce/$r";
        # last reduce has 1 outfile, otherwise nr.
        my $nout = ($r == $nrp - 1) ? 1 : $nr;
        push @task,  { phase => "reduce/$r", task => _plan_reduce($job, $r, $nout, $redbin, $task[-1]{task}) };
    }

    # plan out a final phase
    if( $job->{mr}{content}{final} ){
        push @phase, 'final';
        push @task,  { phase => 'final', task => _plan_final($job, $redbin, $task[-1]{task}) };
    }

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

            }

            my $id = unique();
            push @maptask, AC::MrGamoo::Job::TaskInfo->new( $job,
                id	=> $id,
                phase	=> 'map',
                server  => $s,
                infile  => \@file,
                altplan	=> \%alt,
                _total  => $tot,
                outfile => _plan_outfiles($job, $id, $nr, $redbin, 'map' ),
            );
        }
    }

    return (\@maptask, $copies);
}

sub _plan_reduce {
    my $job     = shift;
    my $rno     = shift;

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN


    my @reds;
    my $sn = 0;
    for my $s (@$redbin){
        my $id = unique();
        push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            phase	=> "reduce/$rno",
            server	=> $s->[0],
            altserver	=> $s->[1],
            infile	=> [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
            outfile	=> _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
        );
        $sn++;
    }

    return \@reds;
}

sub _plan_final {
    my $job     = shift;
    my $redbin  = shift;

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN


    my $jid = $job->{request}{jobid};

    my $id = unique();
    return [
        AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            server	=> $redbin->[0][0],
            altserver	=> $redbin->[0][1],
            phase	=> 'final',
            infile	=> [ map { $_->{outfile}[0]{filename} } @$ptasks ],
            outfile	=> [ ],
        ),
       ];
}

sub _plan_outfiles {
    my $job     = shift;
    my $taskid  = shift;
    my $nout    = shift;
    my $redbin  = shift;
    my $pfix    = shift;

    my @out;
    my $jid = $job->{request}{jobid};

    for my $n (0 .. $nout - 1){

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        taskid		=> $me->{id},
        jobsrc		=> $job->{request}{jobsrc},
        options		=> $job->{request}{options},
        initres		=> $job->{request}{initres},
        console		=> ($job->{request}{console} || ''),
        phase		=> $ti->{phase},
        infile		=> $ti->{infile},
        outfile		=> [ map { $_->{filename} } @{$ti->{outfile}} ],
        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start task");
        $me->failed($job);
        return;
    }

    # no success cb here. we will either timeout, or get a TaskStatus msg.

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

}

sub finished {
    my $me   = shift;
    my $t    = shift;
    my $job  = shift;

    delete $me->{instance}{ $t->{id} };

    $me->{finished} = 1;
    my $outfiles = $me->{outfile};
    my $server   = $t->{server};

    debug("task finished $me->{id} on $server");
    # copy files
    for my $fi (@$outfiles){
        # add to file_info - file is now on one server
        debug("  outfile $fi->{filename}");
        $job->{file_info}{ $fi->{filename} } = {
            filename	=> $fi->{filename},
            location	=> [ $server ],
        };
        $job->{server_info}{$server}{has_files}{$fi->{filename}} = 1;
        # QQQ - optionally leave final files?
        push @{$job->{tmp_file}}, { filename => $fi->{filename}, server => $server };

        # add to copy_pending
        foreach my $s ( @{$fi->{dst}} ){

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

    my @new;
    for my $as (keys %newplan){
        my $newid = unique();
        my $oldid = $me->{id};

        my $new = AC::MrGamoo::Job::TaskInfo->new($job,
            id		=> $newid,
            phase	=> $me->{phase},
            infile	=> $newplan{$as},
            replaces	=> $oldid,
            outfile	=> [ map {
                (my $f = $_->{filename}) =~ s/$oldid/$newid/;
                { dst => $_->{dst}, filename => $f, }
            } @{$me->{outfile}} ],
            server	=> $as,
        );
        debug("replan map $oldid => $newid on $as");

        # keep plan up to date
        $job->{plan}{taskidx}{$newid} = $new;
        push @{$job->{plan}{taskplan}[0]{task}}, $new;

        # move to pending queue
        $new->pend($job) if $job->{phase_no} == 0;

lib/AC/MrGamoo/Job/XferInfo.pm  view on Meta::CPAN

    delete $me->{instance}{ $x->{id} };

    # retry? replan? abort?

    my $server = $me->{dst};
    my $status = get_peer_status_from_id($server);
    my $loc    = $job->{file_info}{$me->{filename}}{location} || $me->{location};

    verbose("xfer failed $me->{id} $server ($status) $me->{filename} @$loc");

    my $skip = $job->{options}{skipmissinginputfiles};	# QQQ
    if( $job->{phase_no} == -1 && $skip ){
        # ignore
        return;
    }

    if( $status != 200 ){
        # replan tasks
        $job->_replan_server($server, 'xfer', $me);
        return;
    }

lib/AC/MrGamoo/OutFile.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-27 11:34 (EST)
# Function: buffer output + open/close files as needed
#
# $Id: OutFile.pm,v 1.2 2011/01/06 17:58:13 jaw Exp $

package AC::MrGamoo::OutFile;
use AC::MrGamoo::Debug 'outfile';
use IO::Compress::Gzip;
use strict;

my $BUFMAX         = 200000;
my $max_open;
my $currently_open = 0;
my %all;


$max_open = `sh -c "ulimit -n"`;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

        if( $me->{request}{phase}     eq 'map' ){
            _do_map( $me );
        }elsif( $me->{request}{phase} eq 'final' ){
            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );
        }else{
            die "unknown map/reduce phase '$me->{request}{phase}'\n";
        }
    };
    if( my $e = $@ ){
        my $myid = my_server_id();
        verbose( "ERROR: $myid - $e" );
        _send_eumsg($me, 'stderr', "ERROR: $myid - $e");
        _update_status( 'FAILED', 0 );
    }

    _close_outfiles( $me );
    _update_status( 'FINISHED', 0 );
    debug("finish child task");
    exit(0);
}

sub _setup_stdio_etal {

    # move socket to parent from STDOUT -> STATUS
    # so user code doesn't trample

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

sub _maybe_update_status {
    my $me = shift;

    $^T = time();

    return if $^T < ($me->{status_time} + $STATUSTIME);
    $me->{status_time} = $^T;
    _update_status( @_ );
}

sub _setup_outfiles {
    my $me = shift;
    my @out;

    my $gz = $me->attr(undef, 'compress');
    for my $file ( @{$me->{request}{outfile}} ){
        my $f = conf_value('basedir') . '/' . $file;
        my($dir) = $f =~ m|^(.+)/[^/]+$|;

        eval{ mkpath($dir, undef, 0777) };
        push @out, AC::MrGamoo::OutFile->new( $f, $gz );
    }

    $me->{outfd} = \@out;
}

sub _close_outfiles {
    my $me = shift;

    for my $io ( @{$me->{outfd}} ){
        $io->close();
    }
    delete $me->{outfd};
}

sub _output_partition {
    my ($me, $n, $key, $data) = @_;

    # md5 is twice as fast as sha1.
    # anything written  in perl is 10 times slower
    my $hash = unpack('N', md5( $key ));
    my $p    = $hash % $n;
    my $io   = $me->{outfd}[$p];
    $io->output( encode_json( [ $key, $data ] ), "\n" );
}


# end-user's 'print' come here
sub eu_print_stdout {
    my $me = shift;

    _send_eumsg($me, 'stdout', "@_");
}

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    _send_eumsg($me, 'stderr', "@_");
}

################################################################

sub _do_map {
    my $me = shift;
    my $mr = $me->{mr};

    debug("doing map");
    my $n        = @{$me->{request}{outfile}};
    my $h_filter = $mr->get_code('filefilter');
    my $h_read   = $mr->get_code('readinput') || { code => \&readinput };
    my $h_map    = $mr->get_code('map');
    my $f_filter = $h_filter ? $h_filter->{code} : undef;
    my $f_read   = $h_read->{code};
    my $f_map    = $h_map->{code};

    my $linen = 0;

    my $maxrun = $me->attr($h_map, 'maxrun');

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


    $h_map->{cleanup}->()    if $h_map->{cleanup};
    $h_read->{cleanup}->()   if $h_read->{cleanup};
    $h_filter->{cleanup}->() if $h_filter && $h_filter->{cleanup};
}

sub _do_reduce {
    my $me = shift;
    my $mr = $me->{mr};

    my $n        = @{$me->{request}{outfile}};
    my($stage)   = $me->{request}{phase} =~ m|reduce/(\d+)|;
    my $h_reduce = $mr->get_code('reduce', $stage);
    my $f_reduce = $h_reduce->{code};
    my $rown = 0;

    my $maxrun = $me->attr($h_reduce, 'maxrun');
    alarm( $maxrun ) if $maxrun;

    debug( "doing reduce step $stage" );

lib/AC/protobuf/mrgamoo.pl  view on Meta::CPAN

                    'initres', 5, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'phase', 6, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'outfile', 7, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'infile', 8, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'master', 9, undef



( run in 0.448 second using v1.01-cache-2.11-cpan-fd5d4e115d8 )