AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN


    $me->_cleanup_tasks();
    $me->_cleanup_files();

    return if $me->{aborted};
    $me->{aborted}       = 1;

    # move to final state
    $me->{phase_no} = @{$me->{plan}{taskplan}};

    $me->{euconsole}->send_msg('stderr', 'aborted job' . ($p{reason} ? ": $p{reason}" : ''));
}

################################################################

sub _cleanup_files {
    my $me = shift;

    $me->{_cleanedup} = 1;
    $me->{statistics}{cleanup_start} = time();
    $me->{statistics}{job_time} = time() - $me->{statistics}{job_start};

lib/AC/MrGamoo/Submit/Request.pm  view on Meta::CPAN

sub progress {
    my $me = shift;
    $me->{func_progress}->( @_ ) if $me->{func_progress};
}


sub redirect_io {
    my $me = shift;

    tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
    tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}

sub print_stderr {
    my $me = shift;

    $me->{eu_print_stderr}->( @_ ) if $me->{eu_print_stderr};
}

sub print {
    my $me = shift;

    $me->{eu_print_stdout}->( @_ ) if $me->{eu_print_stdout};
}

*print_stdout = \&print;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

sub _start_task {
    my $me = shift;

    debug("start child task");
    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );
        }else{
            die "unknown map/reduce phase '$me->{request}{phase}'\n";
        }
    };
    if( my $e = $@ ){
        my $myid = my_server_id();
        verbose( "ERROR: $myid - $e" );
        _send_eumsg($me, 'stderr', "ERROR: $myid - $e");
        _update_status( 'FAILED', 0 );
    }

    _close_outfiles( $me );
    _update_status( 'FINISHED', 0 );
    debug("finish child task");
    exit(0);
}

sub _setup_stdio_etal {

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

}


# end-user's 'print' come here
sub eu_print_stdout {
    my $me = shift;

    _send_eumsg($me, 'stdout', "@_");
}

sub eu_print_stderr {
    my $me = shift;

    _send_eumsg($me, 'stderr', "@_");
}

################################################################

sub _do_map {
    my $me = shift;
    my $mr = $me->{mr};

    debug("doing map");
    my $n        = @{$me->{request}{outfile}};



( run in 1.174 second using v1.01-cache-2.11-cpan-49f99fa48dc )