view release on metacpan or search on metacpan
eg/example.mrjob view on Meta::CPAN
%# -*- mason -*-
# Copyright (c) 2009 by AdCopy
# Author: Jeff Weisberg
# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
#
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $
<%doc>
map reduce example
</%doc>
%################################################################
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
<%config>
system => blargh
tasktimeout => 120
</%config>
%################################################################
eg/example.mrjob view on Meta::CPAN
%# used to load modules
<%common>
use AC::Misc;
use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
$R->print("starting map/reduce job");
return {
mood => 'joyous',
};
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
maxrun => 300
sortprog => /bin/sort
</%attr>
my $data = shift; # one record from the input
# return a key + a value
return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
my $key = shift;
my $itr = shift; # an iterator object
# count
my $n = 0;
$itr->foreach( sub { $n ++ } );
# return a key + a value
eg/filelist.pm view on Meta::CPAN
# the keys in yenta are of the form: 20100126150139_[...]
my $start = isotime($tmin); # 1286819830 => 20101011T175710Z
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20101011T175710Z => 20101011175710
my @files = grep {
# does this file match the request?
($_->{subsystem} eq $syst) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
# get meta-data on this file. data is json encoded
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
# convert space seperated locations to arrayref
$d->{location} = [ (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef); # get all files from $start to now
return \@files;
eg/readinput.pm view on Meta::CPAN
# our file is newline delimted json data
# read next line
my $line = scalar <$fd>;
# end of file?
return (undef, 1) unless defined $line;
my $d = json_decode($line);
# filter input on date range. we could just as easily filter
# in 'map', but doing it here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return (undef, 0) if $d->{tstart} < $R->config('start');
return (undef, 0) if $d->{tstart} >= $R->config('end');
return ($d, 0);
}
1;
lib/AC/MrGamoo.pm view on Meta::CPAN
=item port
specify the TCP port to use
port 3504
=item environment
specify the environment or realm to run in, so you can run multiple
independent map/reduce networks, such as production, staging, and dev.
environment prod
=item allow
specify networks allowed to connect.
allow 127.0.0.1
allow 192.168.10.0/24
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $
package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );
my $mode = $config->{datamode};
my $syst = $config->{system};
my $tmax = $config->{end};
my $tmin = $config->{start};
my $start = isotime($tmin);
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20091109T123456... => 20091109123456
# NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw
$syst = undef if $syst eq '*';
$mode = undef if $mode eq '*';
$syst =~ s/[ ,]/\|/g;
if( $syst ){
$syst = qr/^($syst)$/;
}
debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
my @files = grep {
(!$mode || ($_->{environment} eq $mode)) &&
(!$syst || ($_->{subsystem} =~ $syst)) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
#debug("file: $_");
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
$d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef);
debug("found " .scalar(@files)." files");
return \@files;
}
1;
lib/AC/MrGamoo/AC/ReadInput.pm view on Meta::CPAN
return (undef, 1) unless defined $line;
my $d;
eval { $d = parse_dancr_log($line); };
if( $@ ){
problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
return ;
}
# filter input on date range. we could just as easily filter
# in 'map', but doing here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return if $d->{tstart} < $R->config('start');
return if $d->{tstart} >= $R->config('end');
return ($d, 0);
}
1;
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
last if @serverlist;
}
# sort+filter list
@serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }
grep { $_->{status} == 200 } @serverlist;
# try all addresses
# RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?
my @addrlist = map { @{$_->{ip}} } @serverlist;
for my $ip (@addrlist){
my $addr = inet_itoa($ip->{ipv4});
my $res;
eval {
alarm(30);
$res = $me->_submit_to( $addr, $ip->{port}, $req );
alarm(0);
};
next unless $res && $res->{status_code} == 200;
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
}
################################################################
sub check_code {
my $me = shift;
my $mr = $me->{program};
my $nr = @{ $mr->{content}{reduce} };
$me->_check('map');
$me->_check('reduce', $_) for (0 .. $nr - 1);
$me->_check('final');
return 1;
}
sub _check {
my $me = shift;
my $mr = $me->{program};
lib/AC/MrGamoo/Default/FileList.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.1 2010/11/01 18:41:54 jaw Exp $
package AC::MrGamoo::Default::FileList;
use strict;
# return an array of:
# {
# filename => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
lib/AC/MrGamoo/FileList.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.1 2010/11/01 18:41:42 jaw Exp $
package AC::MrGamoo::FileList;
use AC::MrGamoo::Customize;
use AC::Import;
use strict;
our @ISA = 'AC::MrGamoo::Customize';
our @EXPORT = qw(get_file_list);
lib/AC/MrGamoo/FileList.pm view on Meta::CPAN
use lib '/myperldir';
my $m = AC::MrGamoo::D->new(
class_filelist => 'Local::MrGamoo::FileList',
);
=head1 IMPORTANT
You can fire up the system, and get the servers talking to each other, and
perform some limited tests without this file.
But you must provide this file in order to actually run map/reduce jobs.
=head1 DESCRIPTION
MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.
Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.
When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.
Each element of the returned arrayref should be a hashref
containing at least the following fields:
=head2 filename
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
#
# $Id: Plan.pm,v 1.1 2010/11/01 18:41:56 jaw Exp $
package AC::MrGamoo::Job::Plan;
use AC::MrGamoo::Debug 'plan';
use AC::Misc;
use strict;
my $REDUCEFACTOR = 1.9; # QQQ - config?
my $MAPTARGETMIN = 8; # try to have at least this many maps/server
my $MAPSIZELIMIT = 100_000_000;
sub new {
my $class = shift;
my $job = shift;
my $servers = shift;
my $files = shift;
return unless @$servers;
# how many reduces?
my $nr = _number_of_reduces( $job->{options}, scalar @$servers );
# map servers to reduce bins
my $redbin = _pick_reduce_bins( $nr, $servers );
# plan out the map phase
my @phase = 'map';
my($planmap, $plancopy) = _plan_map( $job, $servers, $files, $nr, $redbin );
my @task = { phase => 'map', task => $planmap };
# plan out the reduce phases
my $nrp = @{$job->{mr}{content}{reduce}};
for my $r (0 .. $nrp - 1){
push @phase, "reduce/$r";
# last reduce has 1 outfile, otherwise nr.
my $nout = ($r == $nrp - 1) ? 1 : $nr;
push @task, { phase => "reduce/$r", task => _plan_reduce($job, $r, $nout, $redbin, $task[-1]{task}) };
}
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
# summary
my %task;
for my $ts (@task){
for my $t ( @{$ts->{task}} ){
$task{ $t->{id} } = $t;
}
}
# debug("plan: " . dumper( \@task ));
debug("infiles: " . @$files . ", precopy: " . @$plancopy . ", maps: " . @$planmap . ", reduces: $nr x $nrp");
return bless {
nserver => scalar(@$servers),
nreduce => $nr,
copying => $plancopy,
phases => \@phase,
taskplan => \@task,
redbin => $redbin,
taskidx => \%task,
}, $class;
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
# pick alt location
next unless @$servers > 1;
$redbin[$bin][1] = $servers->[ ($bin + 1) % @$servers ]->{id};
}
shuffle(\@redbin);
return \@redbin;
}
sub _plan_map {
my $job = shift;
my $servers = shift;
my $files = shift;
my $nr = shift;
my $redbin = shift;
# plan map
# divy files among servers
# split server + files into tasks
my( $filemap, $copies ) = _plan_divy_files( $job, $files, $servers );
my @maptask;
for my $s (keys %$filemap){
my $totalsize = 0;
$totalsize += $_->{size} for @{$filemap->{$s}};;
my $sizelimit = $totalsize / $MAPTARGETMIN;
$sizelimit = $MAPSIZELIMIT if $sizelimit > $MAPSIZELIMIT;
my @todo = sort { $b->{size} <=> $a->{size} } @{$filemap->{$s}};
while( @todo ){
my @file;
my %alt;
my $tot;
while( @todo && ($tot < $sizelimit) ){
my $f = shift @todo;
$tot += $f->{size};
push @file, $f->{filename};
# backup plan?
my $as = $f->{location}[1];
$alt{$f->{filename}} = $as if $as;
}
my $id = unique();
push @maptask, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => 'map',
server => $s,
infile => \@file,
altplan => \%alt,
_total => $tot,
outfile => _plan_outfiles($job, $id, $nr, $redbin, 'map' ),
);
}
}
return (\@maptask, $copies);
}
sub _plan_reduce {
my $job = shift;
my $rno = shift;
my $nout = shift;
my $redbin = shift;
my $ptasks = shift;
my $jid = $job->{request}{jobid};
my @reds;
my $sn = 0;
for my $s (@$redbin){
my $id = unique();
push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => "reduce/$rno",
server => $s->[0],
altserver => $s->[1],
infile => [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
outfile => _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
);
$sn++;
}
return \@reds;
}
sub _plan_final {
my $job = shift;
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
my $jid = $job->{request}{jobid};
my $id = unique();
return [
AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
server => $redbin->[0][0],
altserver => $redbin->[0][1],
phase => 'final',
infile => [ map { $_->{outfile}[0]{filename} } @$ptasks ],
outfile => [ ],
),
];
}
sub _plan_outfiles {
my $job = shift;
my $taskid = shift;
my $nout = shift;
my $redbin = shift;
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
my @out;
my $jid = $job->{request}{jobid};
for my $n (0 .. $nout - 1){
push @out, { filename => "mrtmp/j_$jid/${pfix}_${taskid}_$n", dst => [ @{$redbin->[$n]} ] };
}
return \@out;
}
sub _plan_map_these_servers {
my $job = shift;
my $servers = shift;
# limit number of servers?
my $nm = ($job->{options}{maps} + 0) || @$servers;
my %data;
for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
$data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
$nm -- if $nm;
}
return \%data;
}
sub _plan_divy_files {
my $job = shift;
my $files = shift;
my $servers = shift;
my %filemap;
my %bytes;
my @copies;
my $load = _plan_map_these_servers( $job, $servers );
# divy files up among servers
for my $f (sort { $b->{size} <=> $a->{size} } @$files){
my($best_wgt, $best_loc);
for my $loc ( @{$f->{location}} ){
next unless exists $load->{$loc}; # down?
next unless $load->{$loc}{use};
my $w = (1 + $bytes{$loc}) * (1 + $load->{$loc}{metric});
if( !$best_loc || $w < $best_wgt ){
$best_wgt = $w;
$best_loc = $loc;
}
}
if( $best_loc ){
# a server has the file. process it there.
push @{$filemap{$best_loc}}, $f;
$bytes{$best_loc} += $f->{size};
next;
}
# pick best 2 servers
my($sa, $sb) =
map { $_->[1] }
sort{ $a->[0] <=> $b->[0] }
map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }
grep { $load->{$_}{use} }
(keys %$load);
# copy the file
my @loc = $sa;
push @loc, $sb if $sb;
my $newfile = "mrtmp/j_$job->{request}{jobid}/intmp_" . unique();
debug("no active servers have file: $f->{filename}, copying to @loc => $newfile");
my $ff = {
filename => $newfile,
location => \@loc,
size => $f->{size},
};
push @{$filemap{$sa}}, $ff;
$bytes{$sa} += $ff->{size};
# need to copy this file from its current location to the server(s) that will run map on it
for my $loc (@loc){
push @copies, AC::MrGamoo::Job::XferInfo->new( $job,
id => unique(),
filename => $f->{filename},
dstname => $newfile,
size => $f->{size},
location => $f->{location},
dst => $loc,
);
}
}
return (\%filemap, \@copies);
}
1;
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
taskid => $me->{id},
jobsrc => $job->{request}{jobsrc},
options => $job->{request}{options},
initres => $job->{request}{initres},
console => ($job->{request}{console} || ''),
phase => $ti->{phase},
infile => $ti->{infile},
outfile => [ map { $_->{filename} } @{$ti->{outfile}} ],
master => my_server_id(),
} );
unless( $x ){
verbose("cannot start task");
$me->failed($job);
return;
}
# no success cb here. we will either timeout, or get a TaskStatus msg.
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
if $me->{replaces};
return $me->_replan_altserver($job) if $me->{altserver};
if( $me->{phase} eq 'reduce' ){
verbose("cannot replan task. no altserver");
$job->abort(reason => "cannot replan task. no alternate server available");
return;
}
$me->_replan_map($job);
}
sub _replan_altserver {
my $me = shift;
my $job = shift;
$me->{server} = $me->{altserver};
delete $me->{retries};
delete $me->{altserver};
debug("replanning task to new server");
$me->pend($job);
}
sub _replan_map {
my $me = shift;
my $job = shift;
# remove task
# divy files among servers
# create new tasks
# rediddle next phase
my %newplan; # server => @files
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
my @new;
for my $as (keys %newplan){
my $newid = unique();
my $oldid = $me->{id};
my $new = AC::MrGamoo::Job::TaskInfo->new($job,
id => $newid,
phase => $me->{phase},
infile => $newplan{$as},
replaces => $oldid,
outfile => [ map {
(my $f = $_->{filename}) =~ s/$oldid/$newid/;
{ dst => $_->{dst}, filename => $f, }
} @{$me->{outfile}} ],
server => $as,
);
debug("replan map $oldid => $newid on $as");
# keep plan up to date
$job->{plan}{taskidx}{$newid} = $new;
push @{$job->{plan}{taskplan}[0]{task}}, $new;
# move to pending queue
$new->pend($job) if $job->{phase_no} == 0;
push @new, $new;
}
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
$me->_replan_replace_files( $job, @new );
}
sub _replan_replace_files {
my $me = shift;
my $job = shift;
my @new = shift;
my $oldid = $me->{id};
my $curphase = 0; # map
my $nxtphase = 1; # reduce/0
# remove old task's files, add new tasks' files
for my $ti ( @{$job->{plan}{taskplan}[$nxtphase]{task}} ){
my @infile;
for my $file (@{$ti->{infile}}){
if( $file =~ /$oldid/ ){
for my $new (@new){
my $newid = $new->{id};
(my $n = $file) =~ s/$oldid/$newid/;
lib/AC/MrGamoo/Kibitz/Peers.pm view on Meta::CPAN
);
}
return $txt;
}
sub report_json {
my $all = peer_list_all();
my @fields = qw(hostname environment subsystem datacenter server_id status sort_metric);
return encode_json( [ map {
my %x;
@x{@fields} = @{$_}{@fields};
$x{ip} = [
map { {
ipv4 => inet_itoa($_->{ipv4}),
port => $_->{port},
natdom => $_->{natdom},
} } @{$_->{ip}}
];
\%x;
} @$all ] ) . "\n";
}
################################################################
lib/AC/MrGamoo/MySelf.pm view on Meta::CPAN
return the name of the local datacenter. mrgamoo will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.
sub my_datacenter {
my($domain) = hostname() =~ /^[\.]+\.(.*)/;
return $domain;
}
Note: map/reduce jobs are extremely network intensive. it is not
recommended to spread your servers out. you really want them all
plugged into one big switch. one big fast switch.
=head2 my_network_info
return information about the various networks this server has.
sub my_network_info {
my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));
lib/AC/MrGamoo/PeerList.pm view on Meta::CPAN
# return an array of:
# {
# id => mrgamoo@a2be021ad31c
# metric => 2
# }
sub get_peer_list {
my $s = peer_list_all();
return [ map {
{
id => $_->{server_id},
metric => $_->{sort_metric},
}
} grep { $_->{status} == 200 } @$s ];
}
sub get_peer_addr_from_id {
my $id = shift;
lib/AC/MrGamoo/ReadInput.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 12:00 (EST)
# Function: read files to map
#
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:44 jaw Exp $
package AC::MrGamoo::ReadInput;
use AC::MrGamoo::Customize;
use AC::Import;
use strict;
our @ISA = 'AC::MrGamoo::Customize';
our @EXPORT = qw(readinput);
lib/AC/MrGamoo/ReadInput.pm view on Meta::CPAN
emacs /myperldir/Local/MrGamoo/ReadInput.pm
copy. paste. edit.
use lib '/myperldir';
my $m = AC::MrGamoo::D->new(
class_readinput => 'Local::MrGamoo::ReadInput',
);
=head1 DESCRIPTION
In your map/reduce job, your C<map> function is called once per record.
The C<readinput> function is responsible for reading the actual files
and returning records.
The default C<readinput> returns one line at a time (just like <FILE>).
If you want different behavior, you can provide a C<ReadInput> class,
or spoecify a C<readinput> block in your map/reduce job.
Your function should return an array of 2 values
=head2 record
the record data
=head2 eof
have we reached the end-of-file
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
package AC::MrGamoo::Submit::Compile;
use AC::MrGamoo::Submit::Compile::Block;
use strict;
my %COMPILE = (
config => { tag => 'config', multi => 1, },
doc => { tag => 'block', multi => 1, },
init => { tag => 'block', multi => 0, },
common => { tag => 'simple', multi => 0, },
map => { tag => 'block', multi => 0, required => 1, },
reduce => { tag => 'block', multi => 1, required => 1, },
final => { tag => 'block', multi => 0, },
readinput => { tag => 'block', multi => 0, },
filefilter => { tag => 'block', multi => 0, },
);
my %BLOCK = (
init => 'simple',
cleanup => 'simple',
attr => 'config',
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
_do_map( $me );
}elsif( $me->{request}{phase} eq 'final' ){
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
};
if( my $e = $@ ){
my $myid = my_server_id();
verbose( "ERROR: $myid - $e" );
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
}
sub eu_print_stderr {
my $me = shift;
_send_eumsg($me, 'stderr', "@_");
}
################################################################
sub _do_map {
my $me = shift;
my $mr = $me->{mr};
debug("doing map");
my $n = @{$me->{request}{outfile}};
my $h_filter = $mr->get_code('filefilter');
my $h_read = $mr->get_code('readinput') || { code => \&readinput };
my $h_map = $mr->get_code('map');
my $f_filter = $h_filter ? $h_filter->{code} : undef;
my $f_read = $h_read->{code};
my $f_map = $h_map->{code};
my $linen = 0;
my $maxrun = $me->attr($h_map, 'maxrun');
alarm( $maxrun ) if $maxrun;
for my $file (@{$me->{request}{infile}}){
_maybe_update_status( $me, 'RUNNING', $linen );
$me->{R}{config}{current_file} = $file; # in case user wants for debugging
# filter file list
if( $f_filter ){
next unless $f_filter->( $file );
}
debug("map file: $file");
my $f = conf_value('basedir') . '/' . $file;
open(my $fd, $f) || die "cannot open file '$f': $!\n";
while(1){
_maybe_update_status( $me, 'RUNNING', $linen++ );
# read input
my($d, $eof) = $f_read->( $fd );
last if $eof;
next unless defined $d;
# map
my($key, $data) = $f_map->( $d );
next unless defined $key;
_output_partition( $me, $n, $key, $data );
}
}
$h_map->{cleanup}->() if $h_map->{cleanup};
$h_read->{cleanup}->() if $h_read->{cleanup};
$h_filter->{cleanup}->() if $h_filter && $h_filter->{cleanup};
}
sub _do_reduce {
my $me = shift;
my $mr = $me->{mr};
my $n = @{$me->{request}{outfile}};
my($stage) = $me->{request}{phase} =~ m|reduce/(\d+)|;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
}
################################################################
sub _sort_cmd {
my $me = shift;
my $hc = shift;
my $gz = $me->attr(undef, 'compress');
my $sort = $me->attr($hc,'sortprog') || conf_value('sortprog') || $SORTPROG;
my @file = map { conf_value('basedir') . '/' . $_ } @{$me->{request}{infile}};
if( $gz ){
my $zcat = $me->attr($hc,'gzprog') || conf_value('gzprog') || $GZPROG;
my $cmd = $zcat . ' ' . join(' ', @file) . ' | ' . $sort;
debug("running cmd: $cmd");
return $cmd;
}else{
my @cmd = ($sort, @file);
debug("running cmd: @cmd");
return @cmd;
lib/AC/MrGamoo/User.pm view on Meta::CPAN
no warnings;
# export R
*{$caller . '::R'} = \$AC::MrGamoo::User::R
}
1;
=head1 NAME
AC::MrGamoo::User - namespace where your map/reduce job lives
=cut