Grid-Request
view release on metacpan or search on metacpan
lib/Grid/Request.pm view on Meta::CPAN
}
# Set the notification email address, if configured
my $email = $cmd->email();
if ($email) {
$logger->info("Setting DRM to not block emails.");
($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_BLOCK_EMAIL, "0");
_throw_drmaa("Unable to unblock emails.", $error, $diagnosis) if $error;
$logger->info("Setting the job email.");
($error, $diagnosis) = drmaa_set_vector_attribute($jt, $DRMAA_V_EMAIL, [$email]);
_throw_drmaa("Unable to set the job email.", $error, $diagnosis) if $error;
}
my @drm_methods = qw(account hosts opsys evictable priority memory
project class runtime);
my @native_attrs;
foreach my $method (@drm_methods) {
my $val = $cmd->$method;
if (defined $val) {
# Translate the user provided value to what the DRM understands by calling the
# DRM plugin...
my $attr = $self->_drm->$method($val);
$logger->debug(qq|DRM plugin mapped "$val" to "$attr".|);
push (@native_attrs, $attr) if defined $attr;
} else {
$logger->debug(qq|Nothing defined for "$method".|);
}
}
# Apply the pass_through, if configured
my $pass_through = $cmd->pass_through();
if ($pass_through) {
$logger->info("Adding job pass-through: $pass_through");
push (@native_attrs, $pass_through);
}
if ( scalar(@native_attrs) > 0 ) {
my $native = join(" ", @native_attrs);
$logger->debug(qq|Setting native attribute "$native".|);
($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_NATIVE_SPECIFICATION, $native);
_throw_drmaa("Unable to set native specification.", $error, $diagnosis) if $error;
}
return $jt;
}
# Private method for internal use only. This method is used to submit
# mw (Master/Worker) jobs, which are jobs that iterate over files in a directory,
# lines in a file, or elements in an array, by calling grid_request_worker.
sub _submit_mw {
$logger->debug("In _submit_mw.");
my ($self, $jt, $cmd) = @_;
unless (defined $jt && defined $cmd) {
Grid::Request::InvalidArgumentException->throw("Job template and/or command object are not defined.");
}
$logger->debug("Setting the command executable.");
my ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_REMOTE_COMMAND, $WORKER);
_throw_drmaa("Could not set command executable.") if $error;
# Calculate how many workers we need. First, calculate the number of iterations by
# examining the mw arguments
my $min_count;
foreach my $param ($cmd->params()) {
if ($param->type() ne "PARAM") {
my $count = $param->count();
if (! defined $min_count) {
$min_count = $count;
} else {
if (($count > 0) && ($count < $min_count)) {
$min_count = $count;
$logger->debug("New minimum iteration count of $min_count.");
}
}
}
}
# Approach for master/worker (mw) jobs:
#
# 1. For each parameter, create an argument that contains the argument type, and a
# list of the values to iterate over
# 2. Calculate the minimum number of iterations from the parameters. In other words,
# if there is a mismatch, then you have to take the lowest number of parameters
# so that all parameters have defined siblings.
# 3. Based on the number of iterations N, launch a number of workers on the grid to
# process these where the number is some function of N, f(N).
# 4. Pass the path of the exe to the worker program, so that it knows what to execute
# The worker will know what portion of the work to do by the task id that the DRM
# gives it. In SGE, this is done with an environment variable: SGE_TASK_ID.
# 5. Worker will replace $(Index) and $(Name) placeholders with the iteration number or
# or the value itself in the output file, error file, input file, args, etc...
#
# General form:
# /path/to/worker <executable> <iterations> <workers> \
# param:blah_blah_blah \
# dir:<directory>:blah_blah_blah \
# file:<file>:blah_blah_blah
#
# Example: /path/to/worker /path/to/user/command 1000 5 \
# dir:/path/to/user/directory:-d $(Name) \
# file:/path/to/user/file:-arg $(Name) \
# param:-plain_arg
#
# We use a helper utility and method to determine how to divide the work.
# We don't just path the min_count, because maybe different types of jobs
# should be split up differently. This is why we pass $cmd, so that more
# intelligent analysis may be done if configured...
my $block_size = $cmd->block_size();
if (ref($block_size) eq "CODE") {
$logger->debug("Detected a code reference for block size.");
my $block_size_calculator = $block_size;
$logger->debug("Invoking the code to determine the block size.");
$block_size = $block_size_calculator->($cmd, $min_count);
if (defined($block_size) && length($block_size) && $block_size =~ /^-?\d+$/) {
if ($block_size > 0) {
$logger->debug("Invocation yielded a block size of $block_size.");
} else {
Grid::Request::Exception->throw(
"Block size code reference yielded an invalid result. Must be a positive integer.");
}
} else {
Grid::Request::Exception->throw(
"Block size code reference yielded an invalid result.");
}
} else {
$logger->debug("block_size is a regular scalar: $block_size");
}
# Compute the number of workers to invoke based on the block size.
my $workers = ceil($min_count / $block_size);
my $plurality = ($workers == 1) ? "worker" : "workers";
$logger->info("This master/worker command requires $workers $plurality.");
my $exe = $cmd->command();
my @params;
my $number_of_tasks = $min_count; # Just a variable rename for clarity
push (@params, $exe, $block_size);
my $delim = ':';
foreach my $param ($cmd->params()) {
my $arg_type;
my $type = $param->type();
if ($type eq "PARAM") {
$logger->debug("Found a regular (non-MW) parameter.");
push(@params, "param" . $delim . $param->value());
next;
} elsif ($type eq "DIR") {
$arg_type = "dir";
} elsif ($type eq "ARRAY") {
$arg_type = "array";
( run in 1.056 second using v1.01-cache-2.11-cpan-71847e10f99 )