AC-MrGamoo

 view release on metacpan or  search on metacpan

eg/example.mrjob  view on Meta::CPAN

%# used to load modules
<%common>
    use AC::Misc;
    use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
    $R->print("starting map/reduce job");

    return {
        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300

eg/example.mrjob  view on Meta::CPAN

<%init>
    # init sub-block runs at start of final block
    my $report;
</%init>
<%cleanup>
    # cleanup sub-block runs at end of final block

    # get the values from the init block
    my $iv = $R->initvalue();

    # stdout is tied to $R. so plain print also works.

    print "report for mood: $iv->{mood}\n";
    print $report;

</%cleanup>
    my($key, $data) = @_;

    $report .= "key: $key, value: $data\n";

</%final>

eg/mrgamoo  view on Meta::CPAN

    $mrm->check_code();
    exit;
}

$mrm->open_console() if $opt{console};

# run job
my $id = $mrm->submit( seedlist() );
die "could not run job\n" unless $id;

print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};

exit;

################################################################

sub seedlist {
    # determine list of servers to try
    return '10.200.2.3:3504';

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $src  = shift;
    my $cfg  = shift;

    my $host = hostname();
    my $user = getpwuid($<);
    my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');

    my $me   = bless {
        traceinfo	=> $trace,
    }, $class;
    $me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};

    # compile job
    my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
    $me->{program} = $mr;

    # merge job %config section with passed in config
    $mr->add_config($cfg);

    return $me;
}

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,
        want_reply	=> 1,

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

    };
    if(my $e = $@){
        problem("cannot compile job: $e");
        return;
    }

    # RSN - get_file_list + Plan may take too long - do in sub-process

    # get file list
    my $files = get_file_list( $cf );
    #print STDERR "files: ", dumper($files), "\n";

    for my $f (@$files){
        $me->{file_info}{ $f->{filename} } = $f;
    }

    # get server list
    my $servers = get_peer_list( $cf );
    #print STDERR "servers: ", dumper($servers), "\n";

    # plan job
    my $plan = AC::MrGamoo::Job::Plan->new( $me, $servers, $files );
    #print STDERR "plan: ", dumper($plan), "\n";

    $me->{plan} = $plan;

    $me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});

    $me->{server_info}{$_->{id}} = {} for @$servers;

    $me->_preload_file_copies();
    $REGISTRY{ $me->{request}{jobid} } = $me;
    return $me;

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

        $ph ||= 'cleanup' if $j->{phase_no} >= @{$j->{plan}{phases}};
        $ph ||= $j->{plan}{phases}[ $j->{phase_no} ];

        my $tr = keys %{$j->{task_running}};
        my $tp = keys %{$j->{task_pending}};
        my $cr = keys %{$j->{copy_running}};
        my $cp = keys %{$j->{copy_pending}};
        my $rr = keys %{$j->{request_running}};
        my $rp = keys %{$j->{request_pending}};

        $txt .= sprintf("%s %8s %4d %4d %4d %4d %4d %4d\n", $j->{request}{jobid}, $ph, $tr, $tp, $cr, $cp, $rr, $rp);
        # ...
    }

    return $txt;
}


1;

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

}

sub report {

    my $all = peer_list_all();
    my $txt;
    for my $p (@$all){
        my $lu = $^T - $p->{lastup};
        my $lh = $^T - $p->{timestamp};

        $txt .= sprintf("%-30s %s %s %s %3d %7.2f %d %d\n",
                        $p->{server_id}, $p->{subsystem}, $p->{environment},
                        $p->{datacenter}, $p->{status}, $p->{sort_metric},
                        $lu, $lh,
                        );
    }
    return $txt;
}

sub report_json {

lib/AC/MrGamoo/OutFile.pm  view on Meta::CPAN

    $me->_touch() unless $me->{been_opened};
    $me->_close();
}

sub output {
    my $me  = shift;

    $me->{lastused} = $^T;	# $^T as been updated with current time

    if( my $fd = $me->{fd} ){
        print $fd @_;
    }else{
        $me->{buffer} .= $_ for @_;
        $me->_flush() if length($me->{buffer}) >= $BUFMAX;
    }
}

################################################################

sub DESTROY {
    my $me = shift;

lib/AC/MrGamoo/OutFile.pm  view on Meta::CPAN


sub _flush {
    my $me = shift;

    return unless $me->{buffer};
    _close_things() if $currently_open >= $max_open;

    $me->_open();
    my $fd = $me->{fd};

    print $fd $me->{buffer};
    delete $me->{buffer};
}

# to make sure file gets created
sub _touch {
    my $me = shift;

    _close_things() if $currently_open >= $max_open;
    $me->_open();
}

lib/AC/MrGamoo/Stats.pm  view on Meta::CPAN

    my $stat = shift;

    $STATS{$stat} ++;
}


################################################################

sub http_load {

    return sprintf("loadave:    %0.4f\n\n", loadave());
}

sub http_stats {

    my $res;
    for my $k (sort keys %STATS){
        $res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
    }

    $res .= "\n";
    return $res;
}

sub http_status {
    return "status: OK\n\n";
}

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN

    delete $me->{_line};
    delete $me->{_fd};

    1;
}

sub _lineno_info {
    my $me  = shift;

    # should have the number of the _next_ line
    return sprintf "#line %d $me->{file}\n", $me->{_lineno} + 1;
}

sub _compile_block {
    my $me  = shift;
    my $tag = shift;

    my $b = AC::MrGamoo::Submit::Compile::Block->new();

    $b->{code} = $me->_lineno_info();

lib/AC/MrGamoo/Submit/Request.pm  view on Meta::CPAN

# and indicate progress via $R->progress
sub progress {
    my $me = shift;
    $me->{func_progress}->( @_ ) if $me->{func_progress};
}


sub redirect_io {
    my $me = shift;

    tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
    tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}

sub print_stderr {
    my $me = shift;

    $me->{eu_print_stderr}->( @_ ) if $me->{eu_print_stderr};
}

sub print {
    my $me = shift;

    $me->{eu_print_stdout}->( @_ ) if $me->{eu_print_stdout};
}

*print_stdout = \&print;


1;

lib/AC/MrGamoo/Submit/TieIO.pm  view on Meta::CPAN


    return unless $me->{func};
    $me->{func}->( @_ );
}

sub PRINTF {
    my $me = shift;

    return unless $me->{func};
    my $fmt = shift;
    $me->{func}->( sprintf($fmt, @_) );
}

1;

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    }

    # measure
    for my $file (@{$me->{request}{infile}}){
        my $s = (stat(conf_value('basedir') . '/' . $file))[7];
        $me->{_inputsize} += $s
    }

    debug("input size: $me->{_inputsize}");

    # print STDERR "Task: ", dumper($me), "\n";
    $REGISTRY{$task} = $me;
    return $me;
}

sub start {
    my $me = shift;

    # if too many tasks are running, queue
    my $nrun = 0;
    for my $t (values %REGISTRY){

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

sub _start_task {
    my $me = shift;

    debug("start child task");
    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

        if( $me->{request}{phase}     eq 'map' ){

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    return unless $me->{euconsole};
    $me->{euconsole}->send_msg($type, $msg);
}

sub _update_status {
    my $phase = shift;
    my $amt   = shift;

    # send status to parent process
    debug("sending status @ $^T / $phase/$amt");
    print STATUS "$phase $amt\n";
}

sub _maybe_update_status {
    my $me = shift;

    $^T = time();

    return if $^T < ($me->{status_time} + $STATUSTIME);
    $me->{status_time} = $^T;
    _update_status( @_ );

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


    # md5 is twice as fast as sha1.
    # anything written  in perl is 10 times slower
    my $hash = unpack('N', md5( $key ));
    my $p    = $hash % $n;
    my $io   = $me->{outfd}[$p];
    $io->output( encode_json( [ $key, $data ] ), "\n" );
}


# end-user's 'print' come here
sub eu_print_stdout {
    my $me = shift;

    _send_eumsg($me, 'stdout', "@_");
}

sub eu_print_stderr {
    my $me = shift;

    _send_eumsg($me, 'stderr', "@_");
}

################################################################

sub _do_map {
    my $me = shift;
    my $mr = $me->{mr};



( run in 1.322 second using v1.01-cache-2.11-cpan-de7293f3b23 )