view release on metacpan or search on metacpan
eg/example.mrjob view on Meta::CPAN
# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
#
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $
<%doc>
map reduce example
</%doc>
%################################################################
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
<%config>
system => blargh
tasktimeout => 120
</%config>
%################################################################
%# common block is prepended to all other blocks.
%# used to load modules
<%common>
use AC::Misc;
use AC::Dumper;
eg/example.mrjob view on Meta::CPAN
<%init>
$R->print("starting map/reduce job");
return {
mood => 'joyous',
};
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
maxrun => 300
sortprog => /bin/sort
</%attr>
my $data = shift; # one record from the input
# return a key + a value
return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
eg/example.mrjob view on Meta::CPAN
return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
use_strict => 0
in_package => My::Private::Space
</%attr>
<%init>
# init sub-block runs at start of final block
my $report;
</%init>
<%cleanup>
# cleanup sub-block runs at end of final block
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
sub _read {
my $me = shift;
my $evt = shift;
debug("recvd reply to $me->{info}");
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
# check response
if( $proto->{is_error} ){
return $me->_uhoh("rcvd error response");
}
$proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");
if( $proto->{data}{status_code} != 200 ){
return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
}
$me->{result} = $proto;
$me->{status_ok} = 1;
$me->shut();
}
sub _uh_oh {
my $me = shift;
my $msg = shift;
debug("error $msg");
$me->run_callback('error', { error => $msg } );
$me->shut();
}
1;
lib/AC/MrGamoo/API/Get.pm view on Meta::CPAN
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
my $size = (stat($file))[7];
my $sha1 = sha1_file($file);
debug("get file '$file' size $size");
# send header
my $gb = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
my $hdr = AC::MrGamoo::Protocol->encode_header(
type => $proto->{type},
msgidno => $proto->{msgidno},
lib/AC/MrGamoo/API/Put.pm view on Meta::CPAN
my($dir) = $file =~ m|^(.+)/[^/]+$|;
# mkpath
eval{ mkpath($dir, undef, 0755) };
# open tmp
my $tmp = "$file.tmp";
unless( open(F, "> $tmp") ){
problem("open file failed: $!");
return nbfd_reply(500, 'error', $fd, $proto, $req);
}
# read + write
my $size = $proto->{content_length};
my $sha1 = $req->{hash_sha1};
verbose("put file '$file' size $size");
if( $content ){
syswrite( F, $content );
lib/AC/MrGamoo/API/Put.pm view on Meta::CPAN
}
eval {
my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
close F;
die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
};
if(my $e = $@){
unlink $tmp;
verbose("error: $e");
nbfd_reply(500, 'error', $fd, $proto, $req);
return;
}
rename $tmp, $file;
nbfd_reply(200, 'OK', $fd, $proto, $req);
}
1;
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
if( $gpid ){
# parent
_exit(0);
}else{
# orphaned child
eval {
$func->($io, $proto, $req, @_);
};
if(my $e = $@){
chomp $e;
verbose("child error: $e");
_exit(1);
}
_exit(0);
}
}
}
1;
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
my $x = AC::MrGamoo::Retry->new(
newobj => \&_mk_xfer,
newargs => [ $req ],
tryeach => $req->{location},
);
# reply now
if( $x ){
reply( 200, 'OK', $io, $proto, $req );
}else{
debug("sending error, xfer/retrier failed, $io->{info}");
reply( 501, 'Error', $io, $proto, $req );
}
# send status when finished
$x->set_callback('on_success', \&_yippee, $proto, $req);
$x->set_callback('on_failure', \&_boohoo, $proto, $req);
# start
$x->start();
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
my $me = shift;
my $fd = $me->{console_fd};
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
my $mr = $me->{program};
my $r = AC::MrGamoo::Submit::Request->new( $mr );
$r->{eu_print_stderr} = sub { print STDERR "@_\n" };
$r->{eu_print_stdout} = sub { print STDERR "@_\n" };
# run init section
my $h_init = $mr->get_code( 'init' );
my $initres = ($h_init ? $h_init->{code}() : undef) || {};
$me->{id} = unique();
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobcreate',
msgidno => $^T,
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
$me->_cleanup_tasks();
$me->_cleanup_files();
return if $me->{aborted};
$me->{aborted} = 1;
# move to final state
$me->{phase_no} = @{$me->{plan}{taskplan}};
$me->{euconsole}->send_msg('stderr', 'aborted job' . ($p{reason} ? ": $p{reason}" : ''));
}
################################################################
sub _cleanup_files {
my $me = shift;
$me->{_cleanedup} = 1;
$me->{statistics}{cleanup_start} = time();
$me->{statistics}{job_time} = time() - $me->{statistics}{job_start};
lib/AC/MrGamoo/Kibitz/Client.pm view on Meta::CPAN
$me->{status_ok} = 1;
eval {
my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
for my $update ( @{$resp->{status}} ){
AC::MrGamoo::Kibitz::Peers->update( $update );
}
};
if(my $e = $@){
verbose("error: $e");
}
$me->shut();
}
lib/AC/MrGamoo/MySelf.pm view on Meta::CPAN
emacs /myperldir/Local/MrGamoo/MySelf.pm
copy. paste. edit.
use lib '/myperldir';
my $m = AC::MrGamoo::D->new(
class_myself => 'Local::MrGamoo::MySelf',
);
=head1 DESCRIPTION
provide functions to override default behavior. you may define
any or all of the following functions.
=head2 my_server_id
return a unique identity for this mrgamoo instance. typically,
something similar to the server hostname.
sub my_server_id {
return 'mrm@' . hostname();
}
lib/AC/MrGamoo/Protocol.pm view on Meta::CPAN
my $io = shift;
my $evt = shift;
if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
# decode header
eval {
$io->{proto_header} = __PACKAGE__->decode_header( $io->{rbuffer} );
};
if(my $e=$@){
verbose("cannot decode protocol header: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decode protocol: $e",
});
$io->shut();
return;
}
}
return $io->{proto_header};
}
# for simple status queries, argus, debugging
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
return unless $prog;
my $c = eval $prog;
die $@ if $@;
return $c;
}
sub _die {
my $me = shift;
my $err = shift;
if( $me->{_lineno} ){
die "ERROR: $err\nfile: $me->{file} line: $me->{_lineno}\n$me->{_line}\n";
}
die "ERROR: $err\nfile: $me->{file}\n";
}
sub _next {
my $me = shift;
return unless @{ $me->{lines} };
$me->{_line} = shift @{ $me->{lines} };
$me->{_lineno} ++;
$me->{_file_content} .= $me->{_line};
return $me->{_line};
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
if( $d->{tag} eq 'block'){
$me->_add_block($tag, $me->_compile_block($tag));
}
elsif( $d->{tag} eq 'simple' ){
$me->_add_block($tag, $me->_compile_block_simple($tag));
}
elsif( $d->{tag} eq 'config' ){
$me->_add_config($tag, $me->_compile_config($tag));
}
else{
$me->_die("syntax error");
}
}
delete $me->{_lineno};
delete $me->{_line};
delete $me->{_fd};
1;
}
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
last if $line =~ m|^</%$tag>\s*$|;
my($tag) = $line =~ m|^<%(.*)>\s*$|;
if( $BLOCK{$tag} eq 'simple' ){
$b->{$tag} .= $me->_compile_block_simple( $tag );
$b->{code} .= $me->_lineno_info();
}elsif( $BLOCK{$tag} eq 'config' ){
$b->{$tag} = $me->_compile_config( $tag );
}elsif( $tag ){
$me->_die("syntax error");
}else{
$b->{code} .= $line;
}
}
return $b;
}
sub _compile_block_simple {
lib/AC/MrGamoo/Submit/Request.pm view on Meta::CPAN
sub progress {
my $me = shift;
$me->{func_progress}->( @_ ) if $me->{func_progress};
}
sub redirect_io {
my $me = shift;
tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}
sub print_stderr {
my $me = shift;
$me->{eu_print_stderr}->( @_ ) if $me->{eu_print_stderr};
}
sub print {
my $me = shift;
$me->{eu_print_stdout}->( @_ ) if $me->{eu_print_stdout};
}
*print_stdout = \&print;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
use AC::Daemon;
use Digest::SHA1 'sha1';
use Digest::MD5 'md5';
use File::Path;
use Sys::Syslog;
use Socket;
use JSON;
use strict;
my $STATUSTIME = 5; # seconds
my $MAXRUN = 3600; # override with %attr maxrun
my $SORTPROG = '/usr/bin/sort'; # override with %attr sortprog or config file
my $GZPROG = '/usr/bin/gzcat'; # override with %attr gzprog or config file
# in child process
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
};
if( my $e = $@ ){
my $myid = my_server_id();
verbose( "ERROR: $myid - $e" );
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
_update_status( 'FINISHED', 0 );
debug("finish child task");
exit(0);
}
sub _setup_stdio_etal {
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
}
# end-user's 'print' come here
sub eu_print_stdout {
my $me = shift;
_send_eumsg($me, 'stdout', "@_");
}
sub eu_print_stderr {
my $me = shift;
_send_eumsg($me, 'stderr', "@_");
}
################################################################
sub _do_map {
my $me = shift;
my $mr = $me->{mr};
debug("doing map");
my $n = @{$me->{request}{outfile}};
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
unless( open( $fd, "> $tmpfile" ) ){
verbose("cannot open output file '$tmpfile': $!");
return;
}
my $chk = _sendfile($oreq, $fd, $s, $size);
my $sha1 = $p->{data}{hash_sha1};
die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
};
if(my $e=$@){
debug("error: $e");
return;
}
return $p;
}
sub _sendfile {
my $req = shift;
my $out = shift;
my $in = shift;