AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 16:45 (EST)
# Function: child half of m/r task (the actual running task)
#
# $Id: Running.pm,v 1.5 2011/01/12 19:18:54 jaw Exp $
package AC::MrGamoo::Task::Running;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug 'task_run';
use AC::MrGamoo::Iter::File;
use AC::MrGamoo::EUConsole;
use AC::MrGamoo::ReadInput;
use AC::MrGamoo::OutFile;
use AC::MrGamoo::MySelf;
use AC::Daemon;
use Digest::SHA1 'sha1';
use Digest::MD5 'md5';
use File::Path;
use Sys::Syslog;
use Socket;
use JSON;
use strict;
my $STATUSTIME = 5; # seconds
my $MAXRUN = 3600; # override with %attr maxrun
my $SORTPROG = '/usr/bin/sort'; # override with %attr sortprog or config file
my $GZPROG = '/usr/bin/gzcat'; # override with %attr gzprog or config file
# in child process
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
_do_map( $me );
}elsif( $me->{request}{phase} eq 'final' ){
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
};
if( my $e = $@ ){
my $myid = my_server_id();
verbose( "ERROR: $myid - $e" );
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
_update_status( 'FINISHED', 0 );
debug("finish child task");
exit(0);
}
sub _setup_stdio_etal {
# move socket to parent from STDOUT -> STATUS
# so user code doesn't trample
open( STATUS, ">&STDOUT" );
close STDOUT; open( STDOUT, ">/dev/null");
close STDIN; open( STDIN, "/dev/null");
select STATUS; $| = 1; select STDOUT;
$SIG{CHLD} = sub{};
$SIG{ALRM} = sub{ die "timeout\n" };
openlog('mrgamoo', 'ndelay, pid', (conf_value('syslog') || 'local4'));
alarm( $MAXRUN );
}
sub _setup_console {
my $me = shift;
debug("setup console: $me->{request}{jobid}, $me->{request}{console}");
( run in 1.970 second using v1.01-cache-2.11-cpan-5b529ec07f3 )