AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
my $me = shift;
my $fd = $me->{console_fd};
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
my $mr = $me->{program};
my $r = AC::MrGamoo::Submit::Request->new( $mr );
$r->{eu_print_stderr} = sub { print STDERR "@_\n" };
$r->{eu_print_stdout} = sub { print STDERR "@_\n" };
# run init section
my $h_init = $mr->get_code( 'init' );
my $initres = ($h_init ? $h_init->{code}() : undef) || {};
$me->{id} = unique();
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobcreate',
msgidno => $^T,
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
$me->_cleanup_tasks();
$me->_cleanup_files();
return if $me->{aborted};
$me->{aborted} = 1;
# move to final state
$me->{phase_no} = @{$me->{plan}{taskplan}};
$me->{euconsole}->send_msg('stderr', 'aborted job' . ($p{reason} ? ": $p{reason}" : ''));
}
################################################################
sub _cleanup_files {
my $me = shift;
$me->{_cleanedup} = 1;
$me->{statistics}{cleanup_start} = time();
$me->{statistics}{job_time} = time() - $me->{statistics}{job_start};
lib/AC/MrGamoo/Submit/Request.pm view on Meta::CPAN
sub progress {
my $me = shift;
$me->{func_progress}->( @_ ) if $me->{func_progress};
}
sub redirect_io {
my $me = shift;
tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}
sub print_stderr {
my $me = shift;
$me->{eu_print_stderr}->( @_ ) if $me->{eu_print_stderr};
}
sub print {
my $me = shift;
$me->{eu_print_stdout}->( @_ ) if $me->{eu_print_stdout};
}
*print_stdout = \&print;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
};
if( my $e = $@ ){
my $myid = my_server_id();
verbose( "ERROR: $myid - $e" );
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
_update_status( 'FINISHED', 0 );
debug("finish child task");
exit(0);
}
sub _setup_stdio_etal {
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
}
# end-user's 'print' come here
sub eu_print_stdout {
my $me = shift;
_send_eumsg($me, 'stdout', "@_");
}
sub eu_print_stderr {
my $me = shift;
_send_eumsg($me, 'stderr', "@_");
}
################################################################
sub _do_map {
my $me = shift;
my $mr = $me->{mr};
debug("doing map");
my $n = @{$me->{request}{outfile}};
( run in 1.174 second using v1.01-cache-2.11-cpan-49f99fa48dc )