view release on metacpan or search on metacpan
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
# plan out the map phase
my @phase = 'map';
my($planmap, $plancopy) = _plan_map( $job, $servers, $files, $nr, $redbin );
my @task = { phase => 'map', task => $planmap };
# plan out the reduce phases
my $nrp = @{$job->{mr}{content}{reduce}};
for my $r (0 .. $nrp - 1){
push @phase, "reduce/$r";
# last reduce has 1 outfile, otherwise nr.
my $nout = ($r == $nrp - 1) ? 1 : $nr;
push @task, { phase => "reduce/$r", task => _plan_reduce($job, $r, $nout, $redbin, $task[-1]{task}) };
}
# plan out a final phase
if( $job->{mr}{content}{final} ){
push @phase, 'final';
push @task, { phase => 'final', task => _plan_final($job, $redbin, $task[-1]{task}) };
}
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
}
my $id = unique();
push @maptask, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => 'map',
server => $s,
infile => \@file,
altplan => \%alt,
_total => $tot,
outfile => _plan_outfiles($job, $id, $nr, $redbin, 'map' ),
);
}
}
return (\@maptask, $copies);
}
sub _plan_reduce {
my $job = shift;
my $rno = shift;
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
my @reds;
my $sn = 0;
for my $s (@$redbin){
my $id = unique();
push @reds, AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
phase => "reduce/$rno",
server => $s->[0],
altserver => $s->[1],
infile => [ map { $_->{outfile}[$sn]{filename} } @$ptasks ],
outfile => _plan_outfiles($job, $id, $nout, $redbin, "red$rno"),
);
$sn++;
}
return \@reds;
}
sub _plan_final {
my $job = shift;
my $redbin = shift;
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
my $jid = $job->{request}{jobid};
my $id = unique();
return [
AC::MrGamoo::Job::TaskInfo->new( $job,
id => $id,
server => $redbin->[0][0],
altserver => $redbin->[0][1],
phase => 'final',
infile => [ map { $_->{outfile}[0]{filename} } @$ptasks ],
outfile => [ ],
),
];
}
sub _plan_outfiles {
my $job = shift;
my $taskid = shift;
my $nout = shift;
my $redbin = shift;
my $pfix = shift;
my @out;
my $jid = $job->{request}{jobid};
for my $n (0 .. $nout - 1){
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
taskid => $me->{id},
jobsrc => $job->{request}{jobsrc},
options => $job->{request}{options},
initres => $job->{request}{initres},
console => ($job->{request}{console} || ''),
phase => $ti->{phase},
infile => $ti->{infile},
outfile => [ map { $_->{filename} } @{$ti->{outfile}} ],
master => my_server_id(),
} );
unless( $x ){
verbose("cannot start task");
$me->failed($job);
return;
}
# no success cb here. we will either timeout, or get a TaskStatus msg.
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
}
sub finished {
my $me = shift;
my $t = shift;
my $job = shift;
delete $me->{instance}{ $t->{id} };
$me->{finished} = 1;
my $outfiles = $me->{outfile};
my $server = $t->{server};
debug("task finished $me->{id} on $server");
# copy files
for my $fi (@$outfiles){
# add to file_info - file is now on one server
debug(" outfile $fi->{filename}");
$job->{file_info}{ $fi->{filename} } = {
filename => $fi->{filename},
location => [ $server ],
};
$job->{server_info}{$server}{has_files}{$fi->{filename}} = 1;
# QQQ - optionally leave final files?
push @{$job->{tmp_file}}, { filename => $fi->{filename}, server => $server };
# add to copy_pending
foreach my $s ( @{$fi->{dst}} ){
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
my @new;
for my $as (keys %newplan){
my $newid = unique();
my $oldid = $me->{id};
my $new = AC::MrGamoo::Job::TaskInfo->new($job,
id => $newid,
phase => $me->{phase},
infile => $newplan{$as},
replaces => $oldid,
outfile => [ map {
(my $f = $_->{filename}) =~ s/$oldid/$newid/;
{ dst => $_->{dst}, filename => $f, }
} @{$me->{outfile}} ],
server => $as,
);
debug("replan map $oldid => $newid on $as");
# keep plan up to date
$job->{plan}{taskidx}{$newid} = $new;
push @{$job->{plan}{taskplan}[0]{task}}, $new;
# move to pending queue
$new->pend($job) if $job->{phase_no} == 0;
lib/AC/MrGamoo/Job/XferInfo.pm view on Meta::CPAN
delete $me->{instance}{ $x->{id} };
# retry? replan? abort?
my $server = $me->{dst};
my $status = get_peer_status_from_id($server);
my $loc = $job->{file_info}{$me->{filename}}{location} || $me->{location};
verbose("xfer failed $me->{id} $server ($status) $me->{filename} @$loc");
my $skip = $job->{options}{skipmissinginputfiles}; # QQQ
if( $job->{phase_no} == -1 && $skip ){
# ignore
return;
}
if( $status != 200 ){
# replan tasks
$job->_replan_server($server, 'xfer', $me);
return;
}
lib/AC/MrGamoo/OutFile.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-27 11:34 (EST)
# Function: buffer output + open/close files as needed
#
# $Id: OutFile.pm,v 1.2 2011/01/06 17:58:13 jaw Exp $
package AC::MrGamoo::OutFile;
use AC::MrGamoo::Debug 'outfile';
use IO::Compress::Gzip;
use strict;
my $BUFMAX = 200000;
my $max_open;
my $currently_open = 0;
my %all;
$max_open = `sh -c "ulimit -n"`;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
_do_map( $me );
}elsif( $me->{request}{phase} eq 'final' ){
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
};
if( my $e = $@ ){
my $myid = my_server_id();
verbose( "ERROR: $myid - $e" );
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
_update_status( 'FINISHED', 0 );
debug("finish child task");
exit(0);
}
sub _setup_stdio_etal {
# move socket to parent from STDOUT -> STATUS
# so user code doesn't trample
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _maybe_update_status {
my $me = shift;
$^T = time();
return if $^T < ($me->{status_time} + $STATUSTIME);
$me->{status_time} = $^T;
_update_status( @_ );
}
sub _setup_outfiles {
my $me = shift;
my @out;
my $gz = $me->attr(undef, 'compress');
for my $file ( @{$me->{request}{outfile}} ){
my $f = conf_value('basedir') . '/' . $file;
my($dir) = $f =~ m|^(.+)/[^/]+$|;
eval{ mkpath($dir, undef, 0777) };
push @out, AC::MrGamoo::OutFile->new( $f, $gz );
}
$me->{outfd} = \@out;
}
sub _close_outfiles {
my $me = shift;
for my $io ( @{$me->{outfd}} ){
$io->close();
}
delete $me->{outfd};
}
sub _output_partition {
my ($me, $n, $key, $data) = @_;
# md5 is twice as fast as sha1.
# anything written in perl is 10 times slower
my $hash = unpack('N', md5( $key ));
my $p = $hash % $n;
my $io = $me->{outfd}[$p];
$io->output( encode_json( [ $key, $data ] ), "\n" );
}
# end-user's 'print' come here
sub eu_print_stdout {
my $me = shift;
_send_eumsg($me, 'stdout', "@_");
}
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
_send_eumsg($me, 'stderr', "@_");
}
################################################################
sub _do_map {
my $me = shift;
my $mr = $me->{mr};
debug("doing map");
my $n = @{$me->{request}{outfile}};
my $h_filter = $mr->get_code('filefilter');
my $h_read = $mr->get_code('readinput') || { code => \&readinput };
my $h_map = $mr->get_code('map');
my $f_filter = $h_filter ? $h_filter->{code} : undef;
my $f_read = $h_read->{code};
my $f_map = $h_map->{code};
my $linen = 0;
my $maxrun = $me->attr($h_map, 'maxrun');
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
$h_map->{cleanup}->() if $h_map->{cleanup};
$h_read->{cleanup}->() if $h_read->{cleanup};
$h_filter->{cleanup}->() if $h_filter && $h_filter->{cleanup};
}
sub _do_reduce {
my $me = shift;
my $mr = $me->{mr};
my $n = @{$me->{request}{outfile}};
my($stage) = $me->{request}{phase} =~ m|reduce/(\d+)|;
my $h_reduce = $mr->get_code('reduce', $stage);
my $f_reduce = $h_reduce->{code};
my $rown = 0;
my $maxrun = $me->attr($h_reduce, 'maxrun');
alarm( $maxrun ) if $maxrun;
debug( "doing reduce step $stage" );
lib/AC/protobuf/mrgamoo.pl view on Meta::CPAN
'initres', 5, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'phase', 6, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'outfile', 7, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'infile', 8, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'master', 9, undef