HPC-Runner-Command
view release on metacpan or search on metacpan
lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler/ResolveDeps/AssignTaskDeps.pm view on Meta::CPAN
package HPC::Runner::Command::submit_jobs::Utils::Scheduler::ResolveDeps::AssignTaskDeps;
use Moose::Role;
use Memoize;
use List::MoreUtils 0.428 qw(first_index indexes uniq);
use List::Util qw(first);
=head3 update_job_scheduler_ids_by_task
#TODO do this after the all batches for a single job have been passed
#DEPRACATED job_scheduler_ids_by_array
for job at jobs
for batch at batches
for task at tasks
=cut
sub update_job_scheduler_deps_by_task {
my $self = shift;
$self->app_log->info(
'Updating task dependencies. This may take some time...');
foreach my $job ( $self->all_schedules ) {
next if $self->jobs->{$job}->submission_failure;
$self->current_job($job);
$self->batch_scheduler_ids_by_task;
}
##TODO consider changing this to each schedule
$self->update_job_deps;
}
sub batch_scheduler_ids_by_task {
my $self = shift;
return unless $self->jobs->{ $self->current_job }->has_deps;
$self->batch_counter(
$self->jobs->{ $self->current_job }->{batch_index_start} );
my $scheduler_index = $self->process_all_batch_deps;
while ( my ( $dep_job, $v ) = each %{$scheduler_index} ) {
my @dep_jobs = @{$v};
my $dep_indices = $scheduler_index->{$dep_job};
$self->dep_scheduler_ids_by_task( $dep_job, $dep_indices );
}
}
has 'dep_scheduler_ids_by_task_cache' => (
is => 'rw',
isa => 'HashRef',
default => sub { {} },
clearer => 'clear_dep_scheduler_ids_by_task_cache',
);
sub dep_scheduler_ids_by_task {
my $self = shift;
my $dep_job = shift;
my $dep_indices = shift;
for ( my $y = 0 ; $y < scalar @{$dep_indices} ; $y++ ) {
##This is the current_batch_index
my $batch_ref =
$self->check_find_dep_indexes_cache( $self->current_job, $y );
for ( my $z = 0 ; $z < scalar @{ $dep_indices->[$y] } ; $z++ ) {
#This is the dependency_batch_index
my $dep_index = $dep_indices->[$y]->[$z];
my $dep_ref =
$self->check_find_dep_indexes_cache( $dep_job, $dep_index );
my $array_dep = $self->build_task_deps(
$batch_ref->[0], $dep_ref->[0],
lib/HPC/Runner/Command/submit_jobs/Utils/Scheduler/ResolveDeps/AssignTaskDeps.pm view on Meta::CPAN
# my $dep_task_index = shift;
my $array_dep = [ $batch_scheduler_id, $dep_scheduler_id, ];
return $array_dep;
}
sub check_find_dep_indexes_cache {
my $self = shift;
my $job = shift;
my $index = shift;
if ( exists $self->dep_scheduler_ids_by_task_cache->{$job}->{$index} ) {
return $self->dep_scheduler_ids_by_task_cache->{$job}->{$index};
}
else {
my $scheduler_id =
$self->jobs->{$job}->{batches}->[$index]->{scheduler_id};
my $task_index =
$self->jobs->{$job}->batches->[$index]->cmd_start +
$self->jobs->{$job}->{cmd_start};
$self->dep_scheduler_ids_by_task_cache->{$job}->{$index} =
[ $scheduler_id, $task_index ];
return $self->dep_scheduler_ids_by_task_cache->{$job}->{$index};
}
}
sub push_array_deps {
my $self = shift;
my $array_dep = shift;
if ( $self->exists_array_dep( $array_dep->[0] ) ) {
push( @{ $self->array_deps->{ $array_dep->[0] } }, $array_dep->[1] );
}
else {
$self->array_deps->{ $array_dep->[0] } = [ $array_dep->[1] ];
}
}
sub clean_array_deps {
my $self = shift;
while ( my ( $k, $v ) = each %{ $self->array_deps } ) {
my @uniq = uniq( @{$v} );
@uniq = sort @uniq;
$self->array_deps->{$k} = \@uniq;
}
}
=head3 update_scheduler_ids_by_array
Update the scheduler ids by the task/batch
#TODO There must be a better way to do this
=cut
sub update_scheduler_ids_by_array {
my $self = shift;
my $current_batch_index = $self->batch_counter - 1;
my $index_in_batch =
$self->index_in_batch( $self->current_job, $current_batch_index );
if ( !defined $index_in_batch ) {
$self->app_log->warn( "Job "
. $self->current_job
. " does not have an appropriate index. If you think are reaching this in error please report the issue to github.\n"
);
return;
}
my $batch_scheduler_id =
$self->jobs->{ $self->current_job }->scheduler_ids->[$index_in_batch];
##IF there is no batch id, that means something went wrong with submission
$self->current_batch->scheduler_id($batch_scheduler_id)
if $batch_scheduler_id;
}
=head3 index_in_batch
Using job arrays each job is divided into one or batches of size self->max_array_size
max_array_size = 10
001_job.sh --array=1-10
002_job.sh --array=10-11
self->jobs->{a_job}->all_batch_indexes
job001 => [
{batch_index_start => 1, batch_index_end => 10 },
{batch_index_start => 11, batch_index_end => 20}
]
The index argument is zero indexed, and our counters (job_counter, batch_counter) are 1 indexed
=cut
sub index_in_batch {
my $self = shift;
my $job = shift;
my $index = shift;
$index++;
my $batches = $self->jobs->{$job}->batch_indexes;
return check_batch_index( $batches, $index );
}
memoize('check_batch_index');
sub check_batch_index {
my $batches = shift;
my $search_index = shift;
my $x = first_index {
( run in 0.714 second using v1.01-cache-2.11-cpan-39bf76dae61 )