AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
use strict;
my $STATUSTIME = 5; # seconds
my $MAXRUN = 3600; # override with %attr maxrun
my $SORTPROG = '/usr/bin/sort'; # override with %attr sortprog or config file
my $GZPROG = '/usr/bin/gzcat'; # override with %attr gzprog or config file
# in child process
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
_do_map( $me );
}elsif( $me->{request}{phase} eq 'final' ){
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
};
if( my $e = $@ ){
my $myid = my_server_id();
verbose( "ERROR: $myid - $e" );
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
_update_status( 'FINISHED', 0 );
debug("finish child task");
exit(0);
}
sub _setup_stdio_etal {
# move socket to parent from STDOUT -> STATUS
# so user code doesn't trample
open( STATUS, ">&STDOUT" );
close STDOUT; open( STDOUT, ">/dev/null");
close STDIN; open( STDIN, "/dev/null");
select STATUS; $| = 1; select STDOUT;
$SIG{CHLD} = sub{};
$SIG{ALRM} = sub{ die "timeout\n" };
openlog('mrgamoo', 'ndelay, pid', (conf_value('syslog') || 'local4'));
alarm( $MAXRUN );
}
sub _setup_console {
my $me = shift;
debug("setup console: $me->{request}{jobid}, $me->{request}{console}");
$me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
}
sub _send_eumsg {
my $me = shift;
my $type = shift;
my $msg = shift;
return unless $me->{euconsole};
$me->{euconsole}->send_msg($type, $msg);
}
sub _update_status {
my $phase = shift;
my $amt = shift;
# send status to parent process
debug("sending status @ $^T / $phase/$amt");
print STATUS "$phase $amt\n";
}
sub _maybe_update_status {
my $me = shift;
$^T = time();
return if $^T < ($me->{status_time} + $STATUSTIME);
$me->{status_time} = $^T;
_update_status( @_ );
}
sub _setup_outfiles {
my $me = shift;
my @out;
my $gz = $me->attr(undef, 'compress');
for my $file ( @{$me->{request}{outfile}} ){
my $f = conf_value('basedir') . '/' . $file;
my($dir) = $f =~ m|^(.+)/[^/]+$|;
eval{ mkpath($dir, undef, 0777) };
push @out, AC::MrGamoo::OutFile->new( $f, $gz );
}
$me->{outfd} = \@out;
}
sub _close_outfiles {
my $me = shift;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
my $h_final = $mr->get_code('final');
my $linen = 0;
if( $h_final ){
debug("doing final");
my $maxrun = $me->attr($h_final, 'maxrun');
alarm( $maxrun ) if $maxrun;
my $f_final = $h_final->{code};
# sort
my @cmd = _sort_cmd( $me, $h_final );
open(SORT, '-|', @cmd) || die "cannot open sort pipe: $!\n";
_sort_underway( $me, \*SORT );
while(<SORT>){
chomp;
_maybe_update_status( $me, 'RUNNING', $linen++ );
my $x = decode_json($_);
$f_final->( $x->[0], $x->[1] );
}
$h_final->{cleanup}() if $h_final->{cleanup};
}
}
################################################################
sub _sort_cmd {
my $me = shift;
my $hc = shift;
my $gz = $me->attr(undef, 'compress');
my $sort = $me->attr($hc,'sortprog') || conf_value('sortprog') || $SORTPROG;
my @file = map { conf_value('basedir') . '/' . $_ } @{$me->{request}{infile}};
if( $gz ){
my $zcat = $me->attr($hc,'gzprog') || conf_value('gzprog') || $GZPROG;
my $cmd = $zcat . ' ' . join(' ', @file) . ' | ' . $sort;
debug("running cmd: $cmd");
return $cmd;
}else{
my @cmd = ($sort, @file);
debug("running cmd: @cmd");
return @cmd;
}
}
sub _sort_underway {
my $me = shift;
my $fd = shift;
my $fn = fileno($fd);
my $rfd = "\0\0\0\0";
# send progress updates to master while sort is sorting
while(1){
vec($rfd, $fn, 1) = 1;
select($rfd, undef, undef, 5);
return if vec($rfd, $fn, 1);
_maybe_update_status( $me, 'RUNNING', 0);
}
}
################################################################
1;
( run in 4.294 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )