view release on metacpan or search on metacpan
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
my $me = shift;
$me->set_callback('timeout', \&_timeout);
$me->set_callback('read', \&_read);
$me->set_callback('shutdown', \&_shutdown);
$me->SUPER::start();
$me->timeout_rel($TIMEOUT);
$me->write( $me->{request} );
return $me;
}
sub _timeout {
my $me = shift;
lib/AC/MrGamoo/API/Get.pm view on Meta::CPAN
}
sub _get_file {
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
my $size = (stat($file))[7];
my $sha1 = sha1_file($file);
debug("get file '$file' size $size");
# send header
my $gb = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
lib/AC/MrGamoo/API/Put.pm view on Meta::CPAN
}
sub _put_file {
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
my($dir) = $file =~ m|^(.+)/[^/]+$|;
# mkpath
eval{ mkpath($dir, undef, 0755) };
# open tmp
my $tmp = "$file.tmp";
unless( open(F, "> $tmp") ){
problem("open file failed: $!");
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
# reply now
if( $x ){
reply( 200, 'OK', $io, $proto, $req );
}else{
debug("sending error, xfer/retrier failed, $io->{info}");
reply( 501, 'Error', $io, $proto, $req );
}
# send status when finished
$x->set_callback('on_success', \&_yippee, $proto, $req);
$x->set_callback('on_failure', \&_boohoo, $proto, $req);
# start
$x->start();
}
sub _mk_xfer {
my $loc = shift;
my $req = shift;
my $x = AC::MrGamoo::Xfer->new(
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
return $me;
}
sub get_config_param {
my $me = shift;
$me->{program}->get_config_param(@_);
}
sub set_config_param {
my $me = shift;
$me->{program}->set_config_param(@_);
}
sub open_console {
my $me = shift;
my $fd;
socket($fd, PF_INET, SOCK_DGRAM, 0);
bind($fd, sockaddr_in(0, INADDR_ANY));
my $s = getsockname($fd);
my($port, $addr) = sockaddr_in($s);
lib/AC/MrGamoo/Job/Request.pm view on Meta::CPAN
debug("starting request $me->{info}");
delete $job->{request_pending}{$me->{id}};
my $x = $job->_send_request( $me->{server}, $me->{info}, $me->{proto}, $me->{request});
unless( $x ){
verbose("cannot start request");
return;
}
$x->set_callback('on_success', \&_cb_start_req, $me, $job, 1);
$x->set_callback('on_failure', \&_cb_start_req, $me, $job, 0);
$job->{request_running}{$me->{id}} = $me;
$x->start();
}
sub _cb_start_req {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
master => my_server_id(),
} );
unless( $x ){
verbose("cannot start task");
$me->failed($job);
return;
}
# no success cb here. we will either timeout, or get a TaskStatus msg.
$x->set_callback('on_failure', \&_cb_start_task_fail, $me, $job );
$me->started($job, 'task');
$x->start();
}
sub _cb_start_task_fail {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
lib/AC/MrGamoo/Job/Xfer.pm view on Meta::CPAN
master => my_server_id(),
} );
unless( $x ){
verbose("cannot start xfer");
$me->failed( $job );
return;
}
# no success cb here. we will either timeout, or get a XferStatus msg.
$x->set_callback('on_failure', \&_cb_start_xfer_fail, $me, $job );
$me->started($job, 'xfer');
$x->start();
}
sub _cb_start_xfer_fail {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
lib/AC/MrGamoo/Kibitz/Client.pm view on Meta::CPAN
my $msgid = $$;
sub new {
my $class = shift;
# addr, port, ...
debug('starting kibitz status client');
my $me = $class->SUPER::new( @_ );
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_status',
content_length => 0,
want_reply => 1,
msgid => $msgid++,
}, {
lib/AC/MrGamoo/Retry.pm view on Meta::CPAN
sub _try {
my $me = shift;
my $a = $me->{tryeach}[ $me->{tries} ];
my $o = $me->{newobj}->( $a, @{$me->{newargs}} );
$me->{tries} ++;
debug("try $me->{tries}");
return _on_failure(undef, undef, $me) unless $o;
$o->set_callback( 'on_success', \&_on_success, $me );
$o->set_callback( 'on_failure', \&_on_failure, $me );
$o->start();
}
sub _on_success {
my $x = shift;
my $e = shift;
my $me = shift;
debug("all done!");
lib/AC/MrGamoo/Server.pm view on Meta::CPAN
unless( $AC::MrGamoo::CONF->check_acl( $ip ) ){
verbose("rejecting connection from $ip");
return;
}
my $me = $class->SUPER::new( info => 'tcp mrgamoo server', from_ip => $ip );
$me->start($fd);
$me->timeout_rel($TIMEOUT);
$me->set_callback('read', \&read);
$me->set_callback('timeout', \&timeout);
}
sub timeout {
my $me = shift;
debug("connection timed out");
$me->shut();
}
sub read {
lib/AC/MrGamoo/Server.pm view on Meta::CPAN
debug("http get $base");
my $f = $HTTP{$base};
$f ||= \&http_notfound;
my( $content, $code, $text ) = $f->($url);
$code ||= 200;
$text ||= 'OK';
my $res = "HTTP/1.0 $code $text\r\n"
. "Server: AC/MrGamoo\r\n"
. "Connection: close\r\n"
. "Content-Type: text/plain; charset=UTF-8\r\n"
. "Content-Length: " . length($content) . "\r\n"
. "\r\n"
. $content ;
$me->write_and_shut($res);
}
################################################################
sub http_notfound {
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
if( $d->{multi} ){
# merge
@{ $me->{content}{$tag} }{ keys %$cfg } = values %$cfg;
}else{
$me->_die("redefinition of '$tag' section") if $me->{content}{$tag};
$me->{content}{$tag} = $cfg;
}
}
sub set_initres {
my $me = shift;
my $ir = shift;
$me->{initres} = $ir;
}
sub set_config {
my $me = shift;
my $cfg = shift;
$me->{content}{config} = $cfg;
}
sub get_config_param {
my $me = shift;
my $k = shift;
return $me->{content}{config}{$k};
}
sub set_config_param {
my $me = shift;
my $k = shift;
my $v = shift;
return $me->{content}{config}{$k} = $v;
}
sub _check {
my $me = shift;
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
return $REGISTRY{$task};
}
$me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
$me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};
# compile
eval {
my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
# merge job config + opts.
$mr->set_config($me->{options});
$mr->set_initres($me->{initres});
$me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
$me->{R}{config}{jobid} = $me->{request}{jobid};
$me->{R}{config}{taskid} = $me->{request}{taskid};
$me->{mr} = $mr;
};
if(my $e = $@){
problem("cannot compile task: $e");
return;
}
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
debug("start $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid}");
my $io = AC::DC::IO::Forked->new(
\&AC::MrGamoo::Task::Running::_start_task, [ $me ],
info => "task $me->{request}{jobid}/$me->{request}{taskid}",
);
$me->{io} = $io;
$io->timeout_rel($TIMEOUT);
$io->set_callback('timeout', \&_timeout);
$io->set_callback('read', \&_read, $me);
$io->set_callback('shutdown', \&_shutdown, $me);
$io->start();
}
sub abort {
my $me = _find(shift, @_);
return unless $me;
debug("abort task $me->{request}{taskid}");
$me->{io}->shut() if $me->{io};
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
}, {
jobid => $me->{request}{jobid},
taskid => $me->{request}{taskid},
phase => $me->{status}{phase},
progress => $me->{status}{amt},
} );
return unless $x;
$me->{_status_underway} ++;
$x->set_callback('shutdown', \&_send_status_done, $me);
$x->start();
}
sub attr {
my $me = shift;
my $bk = shift;
my $p = shift;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
my $MAXRUN = 3600; # override with %attr maxrun
my $SORTPROG = '/usr/bin/sort'; # override with %attr sortprog or config file
my $GZPROG = '/usr/bin/gzcat'; # override with %attr gzprog or config file
# in child process
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
$me->{R}{func_output} = sub{ _output_partition($me, $n, @_) };
$me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };
eval {
_setup_outfiles( $me );
if( $me->{request}{phase} eq 'map' ){
_do_map( $me );
}elsif( $me->{request}{phase} eq 'final' ){
_do_final( $me );
}elsif( $me->{request}{phase} =~ /^reduce/ ){
_do_reduce( $me );
}else{
die "unknown map/reduce phase '$me->{request}{phase}'\n";
}
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
_send_eumsg($me, 'stderr', "ERROR: $myid - $e");
_update_status( 'FAILED', 0 );
}
_close_outfiles( $me );
_update_status( 'FINISHED', 0 );
debug("finish child task");
exit(0);
}
sub _setup_stdio_etal {
# move socket to parent from STDOUT -> STATUS
# so user code doesn't trample
open( STATUS, ">&STDOUT" );
close STDOUT; open( STDOUT, ">/dev/null");
close STDIN; open( STDIN, "/dev/null");
select STATUS; $| = 1; select STDOUT;
$SIG{CHLD} = sub{};
$SIG{ALRM} = sub{ die "timeout\n" };
openlog('mrgamoo', 'ndelay, pid', (conf_value('syslog') || 'local4'));
alarm( $MAXRUN );
}
sub _setup_console {
my $me = shift;
debug("setup console: $me->{request}{jobid}, $me->{request}{console}");
$me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
}
sub _send_eumsg {
my $me = shift;
my $type = shift;
my $msg = shift;
return unless $me->{euconsole};
$me->{euconsole}->send_msg($type, $msg);
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _maybe_update_status {
my $me = shift;
$^T = time();
return if $^T < ($me->{status_time} + $STATUSTIME);
$me->{status_time} = $^T;
_update_status( @_ );
}
sub _setup_outfiles {
my $me = shift;
my @out;
my $gz = $me->attr(undef, 'compress');
for my $file ( @{$me->{request}{outfile}} ){
my $f = conf_value('basedir') . '/' . $file;
my($dir) = $f =~ m|^(.+)/[^/]+$|;
eval{ mkpath($dir, undef, 0777) };
push @out, AC::MrGamoo::OutFile->new( $f, $gz );
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
$me->_start();
}
sub _start {
my $me = shift;
debug("start xfer $me->{request}{copyid}");
$me->timeout_rel($TIMEOUT);
$me->set_callback('timeout', \&timeout);
$me->set_callback('shutdown', \&shutdown);
$me->SUPER::start();
}
sub _run_child {
my $srcname = shift; # on remote system
my $dstname = shift; # on local system
my $tmpfile = shift;
my $loc = shift;
my $req = shift;