AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
our %REGISTRY;
our $MSGID = $$;
my $_trying;
our $MAXFILE = `sh -c "ulimit -n"`;
$MAXFILE = 255 if $^O eq 'solaris' && $MAXFILE > 255;
################################################################
# schedule periodic "cronjob"
AC::DC::Sched->new(
info => "job periodic",
freq => 2,
func => \&periodic,
);
################################################################
sub new {
my $class = shift;
# %{ APCMRMJobCreate }
my $me = bless {
request => { @_ },
phase_no => -1,
file_info => {},
tmp_file => [],
server_info => {},
task_running => {},
task_pending => {},
xfer_running => {},
xfer_pending => {},
request_running => {},
request_pending => {},
statistics => { job_start => time() },
}, $class;
if( $REGISTRY{ $me->{request}{jobid} } ){
verbose("ignoring duplicate request job $me->{request}{jobid}");
# will cause a 200 OK, so the requestor will not retry
return $REGISTRY{ $me->{request}{jobid} };
}
verbose("new job: $me->{request}{jobid} ($me->{request}{traceinfo})");
my $cf = $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
# open connection to eu-console
$me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
# partially compile
eval {
$me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
};
if(my $e = $@){
problem("cannot compile job: $e");
return;
}
# RSN - get_file_list + Plan may take too long - do in sub-process
# get file list
my $files = get_file_list( $cf );
#print STDERR "files: ", dumper($files), "\n";
for my $f (@$files){
$me->{file_info}{ $f->{filename} } = $f;
}
# get server list
my $servers = get_peer_list( $cf );
#print STDERR "servers: ", dumper($servers), "\n";
# plan job
my $plan = AC::MrGamoo::Job::Plan->new( $me, $servers, $files );
#print STDERR "plan: ", dumper($plan), "\n";
$me->{plan} = $plan;
$me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});
$me->{server_info}{$_->{id}} = {} for @$servers;
$me->_preload_file_copies();
$REGISTRY{ $me->{request}{jobid} } = $me;
return $me;
}
sub start {
my $me = shift;
debug("start job");
$me->{euconsole}->send_msg('debug', 'starting job');
$me->_try_to_do_something();
1;
}
################################################################
# record status rcvd from task
sub task_status {
my $me = _find(shift, @_);
my %p = @_;
my $taskid = $p{taskid};
return unless $me;
my $t = $me->{task_running}{$taskid};
return unless $t;
$t->update_status( $me, $p{phase}, $p{progress} );
1;
}
# record status rcvd from file xfer
sub xfer_status {
my $me = _find(shift, @_);
my %p = @_;
my $copyid = $p{copyid};
( run in 0.589 second using v1.01-cache-2.11-cpan-5837b0d9d2c )