view release on metacpan or search on metacpan
eg/example.mrjob view on Meta::CPAN
%# used to load modules
<%common>
use AC::Misc;
use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
$R->print("starting map/reduce job");
return {
mood => 'joyous',
};
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
maxrun => 300
eg/example.mrjob view on Meta::CPAN
<%init>
# init sub-block runs at start of final block
my $report;
</%init>
<%cleanup>
# cleanup sub-block runs at end of final block
# get the values from the init block
my $iv = $R->initvalue();
# stdout is tied to $R. so plain print also works.
print "report for mood: $iv->{mood}\n";
print $report;
</%cleanup>
my($key, $data) = @_;
$report .= "key: $key, value: $data\n";
</%final>
$mrm->check_code();
exit;
}
$mrm->open_console() if $opt{console};
# run job
my $id = $mrm->submit( seedlist() );
die "could not run job\n" unless $id;
print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};
exit;
################################################################
sub seedlist {
# determine list of servers to try
return '10.200.2.3:3504';
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
my $src = shift;
my $cfg = shift;
my $host = hostname();
my $user = getpwuid($<);
my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');
my $me = bless {
traceinfo => $trace,
}, $class;
$me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};
# compile job
my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
$me->{program} = $mr;
# merge job %config section with passed in config
$mr->add_config($cfg);
return $me;
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
my $me = shift;
my $fd = $me->{console_fd};
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
my $mr = $me->{program};
my $r = AC::MrGamoo::Submit::Request->new( $mr );
$r->{eu_print_stderr} = sub { print STDERR "@_\n" };
$r->{eu_print_stdout} = sub { print STDERR "@_\n" };
# run init section
my $h_init = $mr->get_code( 'init' );
my $initres = ($h_init ? $h_init->{code}() : undef) || {};
$me->{id} = unique();
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobcreate',
msgidno => $^T,
want_reply => 1,
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
};
if(my $e = $@){
problem("cannot compile job: $e");
return;
}
# RSN - get_file_list + Plan may take too long - do in sub-process
# get file list
my $files = get_file_list( $cf );
#print STDERR "files: ", dumper($files), "\n";
for my $f (@$files){
$me->{file_info}{ $f->{filename} } = $f;
}
# get server list
my $servers = get_peer_list( $cf );
#print STDERR "servers: ", dumper($servers), "\n";
# plan job
my $plan = AC::MrGamoo::Job::Plan->new( $me, $servers, $files );
#print STDERR "plan: ", dumper($plan), "\n";
$me->{plan} = $plan;
$me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});
$me->{server_info}{$_->{id}} = {} for @$servers;
$me->_preload_file_copies();
$REGISTRY{ $me->{request}{jobid} } = $me;
return $me;
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
$ph ||= 'cleanup' if $j->{phase_no} >= @{$j->{plan}{phases}};
$ph ||= $j->{plan}{phases}[ $j->{phase_no} ];
my $tr = keys %{$j->{task_running}};
my $tp = keys %{$j->{task_pending}};
my $cr = keys %{$j->{copy_running}};
my $cp = keys %{$j->{copy_pending}};
my $rr = keys %{$j->{request_running}};
my $rp = keys %{$j->{request_pending}};
$txt .= sprintf("%s %8s %4d %4d %4d %4d %4d %4d\n", $j->{request}{jobid}, $ph, $tr, $tp, $cr, $cp, $rr, $rp);
# ...
}
return $txt;
}
1;
lib/AC/MrGamoo/Kibitz/Peers.pm view on Meta::CPAN
}
sub report {
my $all = peer_list_all();
my $txt;
for my $p (@$all){
my $lu = $^T - $p->{lastup};
my $lh = $^T - $p->{timestamp};
$txt .= sprintf("%-30s %s %s %s %3d %7.2f %d %d\n",
$p->{server_id}, $p->{subsystem}, $p->{environment},
$p->{datacenter}, $p->{status}, $p->{sort_metric},
$lu, $lh,
);
}
return $txt;
}
sub report_json {
lib/AC/MrGamoo/OutFile.pm view on Meta::CPAN
$me->_touch() unless $me->{been_opened};
$me->_close();
}
sub output {
my $me = shift;
$me->{lastused} = $^T; # $^T as been updated with current time
if( my $fd = $me->{fd} ){
print $fd @_;
}else{
$me->{buffer} .= $_ for @_;
$me->_flush() if length($me->{buffer}) >= $BUFMAX;
}
}
################################################################
sub DESTROY {
my $me = shift;
lib/AC/MrGamoo/OutFile.pm view on Meta::CPAN
sub _flush {
my $me = shift;
return unless $me->{buffer};
_close_things() if $currently_open >= $max_open;
$me->_open();
my $fd = $me->{fd};
print $fd $me->{buffer};
delete $me->{buffer};
}
# to make sure file gets created
sub _touch {
my $me = shift;
_close_things() if $currently_open >= $max_open;
$me->_open();
}
lib/AC/MrGamoo/Stats.pm view on Meta::CPAN
my $stat = shift;
$STATS{$stat} ++;
}
################################################################
sub http_load {
return sprintf("loadave: %0.4f\n\n", loadave());
}
sub http_stats {
my $res;
for my $k (sort keys %STATS){
$res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
}
$res .= "\n";
return $res;
}
sub http_status {
return "status: OK\n\n";
}
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
delete $me->{_line};
delete $me->{_fd};
1;
}
sub _lineno_info {
my $me = shift;
# should have the number of the _next_ line
return sprintf "#line %d $me->{file}\n", $me->{_lineno} + 1;
}
sub _compile_block {
my $me = shift;
my $tag = shift;
my $b = AC::MrGamoo::Submit::Compile::Block->new();
$b->{code} = $me->_lineno_info();
lib/AC/MrGamoo/Submit/Request.pm view on Meta::CPAN
# and indicate progress via $R->progress
sub progress {
my $me = shift;
$me->{func_progress}->( @_ ) if $me->{func_progress};
}
sub redirect_io {
my $me = shift;
tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}
sub print_stderr {
my $me = shift;
$me->{eu_print_stderr}->( @_ ) if $me->{eu_print_stderr};
}
sub print {
my $me = shift;
$me->{eu_print_stdout}->( @_ ) if $me->{eu_print_stdout};
}
*print_stdout = \&print;
1;
lib/AC/MrGamoo/Submit/TieIO.pm view on Meta::CPAN
return unless $me->{func};
$me->{func}->( @_ );
}
sub PRINTF {
my $me = shift;
return unless $me->{func};
my $fmt = shift;
$me->{func}->( sprintf($fmt, @_) );
}
1;
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
}
# measure
for my $file (@{$me->{request}{infile}}){
my $s = (stat(conf_value('basedir') . '/' . $file))[7];
$me->{_inputsize} += $s
}
debug("input size: $me->{_inputsize}");
# print STDERR "Task: ", dumper($me), "\n";
$REGISTRY{$task} = $me;
return $me;
}
sub start {
my $me = shift;
# if too many tasks are running, queue
my $nrun = 0;
for my $t (values %REGISTRY){
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
return unless $me->{euconsole};
$me->{euconsole}->send_msg($type, $msg);
}
sub _update_status {
my $phase = shift;
my $amt = shift;
# send status to parent process
debug("sending status @ $^T / $phase/$amt");
print STATUS "$phase $amt\n";
}
sub _maybe_update_status {
my $me = shift;
$^T = time();
return if $^T < ($me->{status_time} + $STATUSTIME);
$me->{status_time} = $^T;
_update_status( @_ );
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
# md5 is twice as fast as sha1.
# anything written in perl is 10 times slower
my $hash = unpack('N', md5( $key ));
my $p = $hash % $n;
my $io = $me->{outfd}[$p];
$io->output( encode_json( [ $key, $data ] ), "\n" );
}
# end-user's 'print' come here
sub eu_print_stdout {
my $me = shift;
_send_eumsg($me, 'stdout', "@_");
}
sub eu_print_stderr {
my $me = shift;
_send_eumsg($me, 'stderr', "@_");
}
################################################################
sub _do_map {
my $me = shift;
my $mr = $me->{mr};