AC-MrGamoo

 view release on metacpan or  search on metacpan

eg/example.mrjob  view on Meta::CPAN

%# -*- mason -*-
# Copyright (c) 2009 by AdCopy
# Author: Jeff Weisberg
# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
#
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $

<%doc>
    map reduce example
</%doc>
%################################################################
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
<%config>
    system      => blargh
    tasktimeout => 120
</%config>
%################################################################

eg/example.mrjob  view on Meta::CPAN

%# used to load modules
<%common>
    use AC::Misc;
    use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
    $R->print("starting map/reduce job");

    return {
        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300
    sortprog    => /bin/sort
</%attr>
    my $data = shift;   # one record from the input

    # return a key + a value
    return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
    my $key = shift;
    my $itr = shift;    # an iterator object

    # count
    my $n = 0;
    $itr->foreach( sub { $n ++ } );

    # return a key + a value

eg/filelist.pm  view on Meta::CPAN

    # the keys in  yenta are of the form: 20100126150139_[...]
    my $start = isotime($tmin);		# 1286819830       => 20101011T175710Z
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20101011T175710Z => 20101011175710


    my @files = grep {
        # does this file match the request?
        ($_->{subsystem}   eq $syst) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        # get meta-data on this file. data is json encoded
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        # convert space seperated locations to arrayref
        $d->{location} = [ (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);	# get all files from $start to now


    return \@files;

eg/readinput.pm  view on Meta::CPAN

    # our file is newline delimted json data

    # read next line
    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

    # filter input on date range. we could just as easily filter
    # in 'map', but doing it here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.

    return (undef, 0) if $d->{tstart} <  $R->config('start');
    return (undef, 0) if $d->{tstart} >= $R->config('end');

    return ($d, 0);
}

1;

lib/AC/MrGamoo.pm  view on Meta::CPAN


=item port

specify the TCP port to use

    port 3504

=item environment

specify the environment or realm to run in, so you can run multiple
independent map/reduce networks, such as production, staging, and dev.

    environment prod

=item allow

specify networks allowed to connect.

    allow 127.0.0.1
    allow 192.168.10.0/24

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $

package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN


    my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );

    my $mode  = $config->{datamode};
    my $syst  = $config->{system};
    my $tmax  = $config->{end};
    my $tmin  = $config->{start};
    my $start = isotime($tmin);
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20091109T123456... => 20091109123456

    # NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw

    $syst = undef if $syst eq '*';
    $mode = undef if $mode eq '*';
    $syst =~ s/[ ,]/\|/g;
    if( $syst ){
        $syst = qr/^($syst)$/;
    }

    debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
    my @files = grep {
        (!$mode || ($_->{environment} eq $mode)) &&
        (!$syst || ($_->{subsystem}   =~ $syst)) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        #debug("file: $_");
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        $d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);

    debug("found " .scalar(@files)." files");
    return \@files;
}


1;

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN

    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);
}

1;

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

        last if @serverlist;
    }

    # sort+filter list
    @serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }
      grep { $_->{status} == 200 } @serverlist;

    # try all addresses
    # RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?

    my @addrlist = map { @{$_->{ip}} } @serverlist;

    for my $ip (@addrlist){
        my $addr = inet_itoa($ip->{ipv4});
        my $res;
        eval {
            alarm(30);
            $res = $me->_submit_to( $addr, $ip->{port}, $req );
            alarm(0);
        };
        next unless $res && $res->{status_code} == 200;

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

}

################################################################

sub check_code {
    my $me = shift;

    my $mr = $me->{program};
    my $nr = @{ $mr->{content}{reduce} };

    $me->_check('map');
    $me->_check('reduce', $_) for (0 .. $nr - 1);
    $me->_check('final');

    return 1;
}

sub _check {
    my $me = shift;
    my $mr = $me->{program};

lib/AC/MrGamoo/Default/FileList.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.1 2010/11/01 18:41:54 jaw Exp $

package AC::MrGamoo::Default::FileList;
use strict;


# return an array of:
#   {
#     filename    => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.1 2010/11/01 18:41:42 jaw Exp $

package AC::MrGamoo::FileList;
use AC::MrGamoo::Customize;
use AC::Import;
use strict;

our @ISA    = 'AC::MrGamoo::Customize';
our @EXPORT = qw(get_file_list);

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN

    use lib '/myperldir';
    my $m = AC::MrGamoo::D->new(
        class_filelist    => 'Local::MrGamoo::FileList',
    );

=head1 IMPORTANT

You can fire up the system, and get the servers talking to each other, and
perform some limited tests without this file.

But you must provide this file in order to actually run map/reduce jobs.

=head1 DESCRIPTION

MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.

Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.

When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.

Each element of the returned arrayref should be a hashref
containing at least the following fields:

=head2 filename

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

#
# $Id: Plan.pm,v 1.1 2010/11/01 18:41:56 jaw Exp $

package AC::MrGamoo::Job::Plan;
use AC::MrGamoo::Debug 'plan';
use AC::Misc;

use strict;

my $REDUCEFACTOR = 1.9;		# QQQ - config?
my $MAPTARGETMIN = 8;		# try to have at least this many maps/server
my $MAPSIZELIMIT = 100_000_000;

sub new {
    my $class   = shift;
    my $job     = shift;
    my $servers = shift;
    my $files   = shift;

    return unless @$servers;

    # how many reduces?
    my $nr = _number_of_reduces( $job->{options}, scalar @$servers );

    # map servers to reduce bins
    my $redbin = _pick_reduce_bins( $nr, $servers );

    # plan out the map phase
    my @phase = 'map';
    my($planmap, $plancopy) = _plan_map( $job, $servers, $files, $nr, $redbin );
    my @task  = { phase => 'map', task => $planmap };

    # plan out the reduce phases
    my $nrp = @{$job->{mr}{content}{reduce}};
    for my $r (0 .. $nrp - 1){
        push @phase, "reduce/$r";
        # last reduce has 1 outfile, otherwise nr.
        my $nout = ($r == $nrp - 1) ? 1 : $nr;
        push @task,  { phase => "reduce/$r", task => _plan_reduce($job, $r, $nout, $redbin, $task[-1]{task}) };
    }

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

    # summary
    my %task;
    for my $ts (@task){
        for my $t ( @{$ts->{task}} ){
            $task{ $t->{id} } = $t;
        }
    }

    # debug("plan: " . dumper( \@task ));

    debug("infiles: " . @$files . ", precopy: " . @$plancopy . ", maps: " . @$planmap . ", reduces: $nr x $nrp");

    return bless {
        nserver		=> scalar(@$servers),
        nreduce		=> $nr,
        copying		=> $plancopy,
        phases		=> \@phase,
        taskplan	=> \@task,
        redbin		=> $redbin,
        taskidx		=> \%task,
    }, $class;

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN


        # pick alt location
        next unless @$servers > 1;
        $redbin[$bin][1] = $servers->[ ($bin + 1) % @$servers ]->{id};
    }
    shuffle(\@redbin);

    return \@redbin;
}

sub _plan_map {
    my $job     = shift;
    my $servers = shift;
    my $files   = shift;
    my $nr      = shift;
    my $redbin  = shift;

    # plan map
    #  divy files among servers
    #  split server + files into tasks

    my( $filemap, $copies ) = _plan_divy_files( $job, $files, $servers );

    my @maptask;
    for my $s (keys %$filemap){
        my $totalsize = 0;
        $totalsize += $_->{size} for @{$filemap->{$s}};;
        my $sizelimit = $totalsize / $MAPTARGETMIN;
        $sizelimit = $MAPSIZELIMIT if $sizelimit > $MAPSIZELIMIT;

        my @todo = sort { $b->{size} <=> $a->{size} } @{$filemap->{$s}};
        while( @todo ){
            my @file;
            my %alt;
            my $tot;

            while( @todo && ($tot < $sizelimit) ){
                my $f = shift @todo;
                $tot += $f->{size};
                push @file, $f->{filename};
                # backup plan?
                my $as = $f->{location}[1];
                $alt{$f->{filename}} = $as if $as;
            }

            my $id = unique();
            push @maptask, AC::MrGamoo::Job::TaskInfo->new( $job,
                id	=> $id,
                phase	=> 'map',
                server  => $s,
                infile  => \@file,
                altplan	=> \%alt,
                _total  => $tot,
                outfile => _plan_outfiles($job, $id, $nr, $redbin, 'map' ),
            );
        }
    }

    return (\@maptask, $copies);
}

sub _plan_reduce {
    my $job     = shift;
    my $rno     = shift;
    my $nout    = shift;
    my $redbin  = shift;
    my $ptasks  = shift;

    my $jid = $job->{request}{jobid};

    my @reds;
    my $sn = 0;
    for my $s (@$redbin){
        my $id = unique();
        push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            phase	=> "reduce/$rno",
            server	=> $s->[0],
            altserver	=> $s->[1],
            infile	=> [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
            outfile	=> _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
        );
        $sn++;
    }

    return \@reds;
}

sub _plan_final {
    my $job     = shift;

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN


    my $jid = $job->{request}{jobid};

    my $id = unique();
    return [
        AC::MrGamoo::Job::TaskInfo->new( $job,
            id		=> $id,
            server	=> $redbin->[0][0],
            altserver	=> $redbin->[0][1],
            phase	=> 'final',
            infile	=> [ map { $_->{outfile}[0]{filename} } @$ptasks ],
            outfile	=> [ ],
        ),
       ];
}

sub _plan_outfiles {
    my $job     = shift;
    my $taskid  = shift;
    my $nout    = shift;
    my $redbin  = shift;

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

    my @out;
    my $jid = $job->{request}{jobid};

    for my $n (0 .. $nout - 1){
        push @out, { filename => "mrtmp/j_$jid/${pfix}_${taskid}_$n", dst => [ @{$redbin->[$n]} ] };
    }

    return \@out;
}

sub _plan_map_these_servers {
    my $job     = shift;
    my $servers = shift;

    # limit number of servers?
    my $nm = ($job->{options}{maps} + 0) || @$servers;

    my %data;
    for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
        $data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
        $nm -- if $nm;
    }

    return \%data;
}

sub _plan_divy_files {
    my $job     = shift;
    my $files   = shift;
    my $servers = shift;

    my %filemap;
    my %bytes;
    my @copies;

    my $load = _plan_map_these_servers( $job, $servers );

    # divy files up among servers
    for my $f (sort { $b->{size} <=> $a->{size} } @$files){
        my($best_wgt, $best_loc);
        for my $loc ( @{$f->{location}} ){
            next unless exists $load->{$loc};	# down?
            next unless $load->{$loc}{use};
            my $w = (1 + $bytes{$loc}) * (1 + $load->{$loc}{metric});
            if( !$best_loc || $w < $best_wgt ){
                $best_wgt = $w;
                $best_loc = $loc;
            }
        }

        if( $best_loc ){
            # a server has the file. process it there.
            push @{$filemap{$best_loc}}, $f;
            $bytes{$best_loc} += $f->{size};
            next;
        }

        # pick best 2 servers
        my($sa, $sb) =
          map { $_->[1] }
          sort{ $a->[0] <=> $b->[0] }
          map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }
          grep { $load->{$_}{use} }
            (keys %$load);

        # copy the file
        my @loc = $sa;
        push @loc, $sb if $sb;
        my $newfile = "mrtmp/j_$job->{request}{jobid}/intmp_" . unique();
        debug("no active servers have file: $f->{filename}, copying to @loc => $newfile");

        my $ff = {
            filename	=> $newfile,
            location	=> \@loc,
            size	=> $f->{size},
        };
        push @{$filemap{$sa}}, $ff;
        $bytes{$sa} += $ff->{size};

        # need to copy this file from its current location to the server(s) that will run map on it
        for my $loc (@loc){
            push @copies, AC::MrGamoo::Job::XferInfo->new( $job,
                 id		=> unique(),
                 filename	=> $f->{filename},
                 dstname	=> $newfile,
                 size		=> $f->{size},
                 location	=> $f->{location},
                 dst		=> $loc,
                );
        }
    }

    return (\%filemap, \@copies);
}

1;

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        taskid		=> $me->{id},
        jobsrc		=> $job->{request}{jobsrc},
        options		=> $job->{request}{options},
        initres		=> $job->{request}{initres},
        console		=> ($job->{request}{console} || ''),
        phase		=> $ti->{phase},
        infile		=> $ti->{infile},
        outfile		=> [ map { $_->{filename} } @{$ti->{outfile}} ],
        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start task");
        $me->failed($job);
        return;
    }

    # no success cb here. we will either timeout, or get a TaskStatus msg.

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

      if $me->{replaces};

    return $me->_replan_altserver($job) if $me->{altserver};

    if( $me->{phase} eq 'reduce' ){
        verbose("cannot replan task. no altserver");
        $job->abort(reason => "cannot replan task. no alternate server available");
        return;
    }

    $me->_replan_map($job);
}

sub _replan_altserver {
    my $me  = shift;
    my $job = shift;

    $me->{server} = $me->{altserver};
    delete $me->{retries};
    delete $me->{altserver};

    debug("replanning task to new server");
    $me->pend($job);
}

sub _replan_map {
    my $me  = shift;
    my $job = shift;

    # remove task
    # divy files among servers
    # create new tasks
    # rediddle next phase

    my %newplan;	# server => @files

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

    my @new;
    for my $as (keys %newplan){
        my $newid = unique();
        my $oldid = $me->{id};

        my $new = AC::MrGamoo::Job::TaskInfo->new($job,
            id		=> $newid,
            phase	=> $me->{phase},
            infile	=> $newplan{$as},
            replaces	=> $oldid,
            outfile	=> [ map {
                (my $f = $_->{filename}) =~ s/$oldid/$newid/;
                { dst => $_->{dst}, filename => $f, }
            } @{$me->{outfile}} ],
            server	=> $as,
        );
        debug("replan map $oldid => $newid on $as");

        # keep plan up to date
        $job->{plan}{taskidx}{$newid} = $new;
        push @{$job->{plan}{taskplan}[0]{task}}, $new;

        # move to pending queue
        $new->pend($job) if $job->{phase_no} == 0;

        push @new, $new;
    }

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

    $me->_replan_replace_files( $job, @new );

}

sub _replan_replace_files {
    my $me  = shift;
    my $job = shift;
    my @new = shift;

    my $oldid = $me->{id};
    my $curphase = 0;	# map
    my $nxtphase = 1;	# reduce/0

    # remove old task's files, add new tasks' files
    for my $ti ( @{$job->{plan}{taskplan}[$nxtphase]{task}} ){
        my @infile;
        for my $file (@{$ti->{infile}}){
            if( $file =~ /$oldid/ ){
                for my $new (@new){
                    my $newid = $new->{id};
                    (my $n = $file) =~ s/$oldid/$newid/;

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

                        );
    }
    return $txt;
}

sub report_json {

    my $all = peer_list_all();
    my @fields = qw(hostname environment subsystem datacenter server_id status sort_metric);

    return encode_json( [ map {
        my %x;
        @x{@fields} = @{$_}{@fields};
        $x{ip} = [
            map { {
                ipv4	=> inet_itoa($_->{ipv4}),
                port	=> $_->{port},
                natdom	=> $_->{natdom},
            } } @{$_->{ip}}
           ];
        \%x;
    } @$all ] ) . "\n";
}

################################################################

lib/AC/MrGamoo/MySelf.pm  view on Meta::CPAN

return the name of the local datacenter. mrgamoo will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.

    sub my_datacenter {
        my($domain) = hostname() =~ /^[\.]+\.(.*)/;
        return $domain;
    }

Note: map/reduce jobs are extremely network intensive. it is not
recommended to spread your servers out. you really want them all
plugged into one big switch. one big fast switch.

=head2 my_network_info

return information about the various networks this server has.

    sub my_network_info {
        my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
        my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));

lib/AC/MrGamoo/PeerList.pm  view on Meta::CPAN

# return an array of:
#   {
#     id     => mrgamoo@a2be021ad31c
#     metric => 2
#   }

sub get_peer_list {

    my $s = peer_list_all();

    return [ map {
        {
            id		=> $_->{server_id},
            metric	=> $_->{sort_metric},
        }
    } grep { $_->{status} == 200 } @$s ];
}

sub get_peer_addr_from_id {
    my $id = shift;

lib/AC/MrGamoo/ReadInput.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 12:00 (EST)
# Function: read files to map
#
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:44 jaw Exp $

package AC::MrGamoo::ReadInput;
use AC::MrGamoo::Customize;
use AC::Import;
use strict;

our @ISA    = 'AC::MrGamoo::Customize';
our @EXPORT = qw(readinput);

lib/AC/MrGamoo/ReadInput.pm  view on Meta::CPAN

    emacs /myperldir/Local/MrGamoo/ReadInput.pm
    copy. paste. edit.

    use lib '/myperldir';
    my $m = AC::MrGamoo::D->new(
        class_readinput    => 'Local::MrGamoo::ReadInput',
    );

=head1 DESCRIPTION

In your map/reduce job, your C<map> function is called once per record.
The C<readinput> function is responsible for reading the actual files
and returning records.

The default C<readinput> returns one line at a time (just like <FILE>).

If you want different behavior, you can provide a C<ReadInput> class,
or spoecify a C<readinput> block in your map/reduce job.

Your function should return an array of 2 values

=head2 record

the record data

=head2 eof

have we reached the end-of-file

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN


package AC::MrGamoo::Submit::Compile;
use AC::MrGamoo::Submit::Compile::Block;
use strict;

my %COMPILE = (
    config	=> { tag => 'config',  multi => 1, },
    doc 	=> { tag => 'block',   multi => 1, },
    init	=> { tag => 'block',   multi => 0, },
    common	=> { tag => 'simple',  multi => 0, },
    map		=> { tag => 'block',   multi => 0, required => 1, },
    reduce	=> { tag => 'block',   multi => 1, required => 1, },
    final	=> { tag => 'block',   multi => 0, },
    readinput	=> { tag => 'block',   multi => 0, },
    filefilter	=> { tag => 'block',   multi => 0, },
    );

my %BLOCK = (
    init	=> 'simple',
    cleanup	=> 'simple',
    attr	=> 'config',

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

        if( $me->{request}{phase}     eq 'map' ){
            _do_map( $me );
        }elsif( $me->{request}{phase} eq 'final' ){
            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );
        }else{
            die "unknown map/reduce phase '$me->{request}{phase}'\n";
        }
    };
    if( my $e = $@ ){
        my $myid = my_server_id();
        verbose( "ERROR: $myid - $e" );
        _send_eumsg($me, 'stderr', "ERROR: $myid - $e");
        _update_status( 'FAILED', 0 );
    }

    _close_outfiles( $me );

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

}

sub eu_print_stderr {
    my $me = shift;

    _send_eumsg($me, 'stderr', "@_");
}

################################################################

sub _do_map {
    my $me = shift;
    my $mr = $me->{mr};

    debug("doing map");
    my $n        = @{$me->{request}{outfile}};
    my $h_filter = $mr->get_code('filefilter');
    my $h_read   = $mr->get_code('readinput') || { code => \&readinput };
    my $h_map    = $mr->get_code('map');
    my $f_filter = $h_filter ? $h_filter->{code} : undef;
    my $f_read   = $h_read->{code};
    my $f_map    = $h_map->{code};

    my $linen = 0;

    my $maxrun = $me->attr($h_map, 'maxrun');
    alarm( $maxrun ) if $maxrun;

    for my $file (@{$me->{request}{infile}}){
        _maybe_update_status( $me, 'RUNNING', $linen );
        $me->{R}{config}{current_file} = $file;	# in case user wants for debugging

        # filter file list
        if( $f_filter ){
            next unless $f_filter->( $file );
        }

        debug("map file: $file");

        my $f = conf_value('basedir') . '/' . $file;

        open(my $fd, $f) || die "cannot open file '$f': $!\n";

        while(1){
            _maybe_update_status( $me, 'RUNNING', $linen++ );

            # read input
            my($d, $eof) = $f_read->( $fd );
            last if $eof;
            next unless defined $d;

            # map
            my($key, $data) = $f_map->( $d );
            next unless defined $key;
            _output_partition( $me, $n, $key, $data );
        }
    }

    $h_map->{cleanup}->()    if $h_map->{cleanup};
    $h_read->{cleanup}->()   if $h_read->{cleanup};
    $h_filter->{cleanup}->() if $h_filter && $h_filter->{cleanup};
}

sub _do_reduce {
    my $me = shift;
    my $mr = $me->{mr};

    my $n        = @{$me->{request}{outfile}};
    my($stage)   = $me->{request}{phase} =~ m|reduce/(\d+)|;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

}

################################################################

sub _sort_cmd {
    my $me = shift;
    my $hc = shift;

    my $gz   = $me->attr(undef, 'compress');
    my $sort = $me->attr($hc,'sortprog') || conf_value('sortprog') || $SORTPROG;
    my @file = map { conf_value('basedir') . '/' . $_ } @{$me->{request}{infile}};

    if( $gz ){
        my $zcat = $me->attr($hc,'gzprog') || conf_value('gzprog') || $GZPROG;
        my $cmd  = $zcat . ' ' . join(' ', @file) . ' | ' . $sort;
        debug("running cmd: $cmd");
        return $cmd;
    }else{
        my @cmd = ($sort, @file);
        debug("running cmd: @cmd");
        return @cmd;

lib/AC/MrGamoo/User.pm  view on Meta::CPAN

    no warnings;

    # export R
    *{$caller . '::R'} = \$AC::MrGamoo::User::R
}

1;

=head1 NAME

AC::MrGamoo::User - namespace where your map/reduce job lives

=cut



( run in 0.768 second using v1.01-cache-2.11-cpan-49f99fa48dc )