view release on metacpan or search on metacpan
lib/AC/MrGamoo/AC/ReadInput.pm view on Meta::CPAN
our $R; # exported by AC::MrGamoo::User
sub readinput {
my $fd = shift;
my $line = scalar <$fd>;
return (undef, 1) unless defined $line;
my $d;
eval { $d = parse_dancr_log($line); };
if( $@ ){
problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
return ;
}
# filter input on date range. we could just as easily filter
# in 'map', but doing here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return if $d->{tstart} < $R->config('start');
return if $d->{tstart} >= $R->config('end');
lib/AC/MrGamoo/API/Put.pm view on Meta::CPAN
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
my($dir) = $file =~ m|^(.+)/[^/]+$|;
# mkpath
eval{ mkpath($dir, undef, 0755) };
# open tmp
my $tmp = "$file.tmp";
unless( open(F, "> $tmp") ){
problem("open file failed: $!");
return nbfd_reply(500, 'error', $fd, $proto, $req);
}
# read + write
my $size = $proto->{content_length};
my $sha1 = $req->{hash_sha1};
verbose("put file '$file' size $size");
if( $content ){
syswrite( F, $content );
$size -= length($content);
}
eval {
my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
close F;
die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
};
if(my $e = $@){
unlink $tmp;
verbose("error: $e");
nbfd_reply(500, 'error', $fd, $proto, $req);
return;
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
return;
}else{
# child
my $gpid = fork();
if( $gpid ){
# parent
_exit(0);
}else{
# orphaned child
eval {
$func->($io, $proto, $req, @_);
};
if(my $e = $@){
chomp $e;
verbose("child error: $e");
_exit(1);
}
_exit(0);
}
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
type => 'mrgamoo_status',
msgidno => $^T,
want_reply => 1,
}, {});
# get the full list of servers
# contact each seed passed in above, until we get a reply
for my $s ( @$seed ){
my($addr, $port) = split /:/, $s;
$me->{fdebug}->("attempting to fetch server list from $addr:$port");
eval {
alarm(1);
my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
my $res = AC::MrGamoo::Protocol->decode_reply($reply);
alarm(0);
my $list = $res->{status};
@serverlist = @$list if $list && @$list;
};
last if @serverlist;
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
grep { $_->{status} == 200 } @serverlist;
# try all addresses
# RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?
my @addrlist = map { @{$_->{ip}} } @serverlist;
for my $ip (@addrlist){
my $addr = inet_itoa($ip->{ipv4});
my $res;
eval {
alarm(30);
$res = $me->_submit_to( $addr, $ip->{port}, $req );
alarm(0);
};
next unless $res && $res->{status_code} == 200;
$me->{master} = { addr => $addr, port => $ip->{port} };
return 1;
}
return ;
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
$me->_check('final');
return 1;
}
sub _check {
my $me = shift;
my $mr = $me->{program};
my $prog = $mr->compile(@_);
eval "sub $prog";
die $@ if $@;
}
1;
lib/AC/MrGamoo/Customize.pm view on Meta::CPAN
use strict;
sub customize {
my $class = shift;
my $implby = shift;
(my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;
# load user's implemantation + default
for my $p ($implby, $default){
eval "require $p" if $p;
die $@ if $@;
}
# import/export
no strict;
no warnings;
for my $f ( @{$class . '::CUSTOM'} ){
*{$class . '::' . $f} = ($implby && $implby->can($f)) || $default->can($f);
}
}
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
}
verbose("new job: $me->{request}{jobid} ($me->{request}{traceinfo})");
my $cf = $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
# open connection to eu-console
$me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
# partially compile
eval {
$me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
};
if(my $e = $@){
problem("cannot compile job: $e");
return;
}
# RSN - get_file_list + Plan may take too long - do in sub-process
# get file list
lib/AC/MrGamoo/Kibitz/Client.pm view on Meta::CPAN
my $me = shift;
my $evt = shift;
debug("recvd reply");
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
eval {
my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
for my $update ( @{$resp->{status}} ){
AC::MrGamoo::Kibitz::Peers->update( $update );
}
};
if(my $e = $@){
verbose("error: $e");
}
$me->shut();
}
lib/AC/MrGamoo/Protocol.pm view on Meta::CPAN
return ($p, $data, $content);
}
sub _check_protocol {
my $io = shift;
my $evt = shift;
if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
# decode header
eval {
$io->{proto_header} = __PACKAGE__->decode_header( $io->{rbuffer} );
};
if(my $e=$@){
verbose("cannot decode protocol header: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decode protocol: $e",
});
$io->shut();
return;
lib/AC/MrGamoo/Server.pm view on Meta::CPAN
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
eval {
$data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
};
if(my $e = $@ ){
problem("cannot decode request: $e");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
}
sub get_code {
my $me = shift;
my $name = shift;
my $num = shift;
my $prog = $me->compile( $name, $num );
return unless $prog;
my $c = eval $prog;
die $@ if $@;
return $c;
}
sub _die {
my $me = shift;
my $err = shift;
if( $me->{_lineno} ){
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
if( $REGISTRY{$task} ){
verbose("ignoring duplicate request task $task");
# will cause a 200 OK, so the requestor will not retry
return $REGISTRY{$task};
}
$me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
$me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};
# compile
eval {
my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
# merge job config + opts.
$mr->set_config($me->{options});
$mr->set_initres($me->{initres});
$me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
$me->{R}{config}{jobid} = $me->{request}{jobid};
$me->{R}{config}{taskid} = $me->{request}{taskid};
$me->{mr} = $mr;
};
if(my $e = $@){
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
_do_map( $me );
}elsif( $me->{request}{phase} eq 'final' ){
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _setup_outfiles {
my $me = shift;
my @out;
my $gz = $me->attr(undef, 'compress');
for my $file ( @{$me->{request}{outfile}} ){
my $f = conf_value('basedir') . '/' . $file;
my($dir) = $f =~ m|^(.+)/[^/]+$|;
eval{ mkpath($dir, undef, 0777) };
push @out, AC::MrGamoo::OutFile->new( $f, $gz );
}
$me->{outfd} = \@out;
}
sub _close_outfiles {
my $me = shift;
for my $io ( @{$me->{outfd}} ){
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
if( $REGISTRY{$req->{copyid}} ){
verbose("ignoring duplicate xfer $req->{copyid}");
return $REGISTRY{$req->{copyid}};
}
$dstname = conf_value('basedir') . '/' . $dstname;
my $tmpfile = $dstname . ".$$";
# mkpath
my($dir) = $dstname =~ m|^(.+)/[^/]+$|;
eval{ mkpath($dir, undef, 0755) };
my $me = $class->SUPER::new( \&_run_child,
[ $srcname, $dstname, $tmpfile, $loc, $req ],
info => "xfer $loc:$srcname",
request => $req,
rbufsize => 65536,
);
return unless $me;
$REGISTRY{$req->{copyid}} = $me;
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
debug("connecting to $addr:$port");
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'scribl_get',
msgidno => $$,
want_reply => 1,
}, { filename => $srcname } );
my $p;
eval {
# connect
my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
return unless $s;
# send req
AC::MrGamoo::Protocol->write_request($s, $req);
# get response
my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
$p = AC::MrGamoo::Protocol->decode_header($buf);