Grid-Request

 view release on metacpan or  search on metacpan

lib/Grid/Request.pm  view on Meta::CPAN

    }

    # Set the notification email address, if configured
    my $email = $cmd->email();
    if ($email) {
        $logger->info("Setting DRM to not block emails.");
        ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_BLOCK_EMAIL, "0");
        _throw_drmaa("Unable to unblock emails.", $error, $diagnosis) if $error;
        $logger->info("Setting the job email.");
        ($error, $diagnosis) = drmaa_set_vector_attribute($jt, $DRMAA_V_EMAIL, [$email]);
        _throw_drmaa("Unable to set the job email.", $error, $diagnosis) if $error;
    }
 
    my @drm_methods = qw(account hosts opsys evictable priority memory
                         project class runtime);
    my @native_attrs;
    foreach my $method (@drm_methods) {
        my $val = $cmd->$method;
        if (defined $val) {
            # Translate the user provided value to what the DRM understands by calling the
            # DRM plugin...
            my $attr = $self->_drm->$method($val);
            $logger->debug(qq|DRM plugin mapped "$val" to "$attr".|);
            push (@native_attrs, $attr) if defined $attr;
        } else {
            $logger->debug(qq|Nothing defined for "$method".|);
        }
    }
    # Apply the pass_through, if configured
    my $pass_through = $cmd->pass_through();
    if ($pass_through) {
        $logger->info("Adding job pass-through: $pass_through");
        push (@native_attrs, $pass_through);
    }

    if ( scalar(@native_attrs) > 0 ) {
        my $native = join(" ", @native_attrs);
        $logger->debug(qq|Setting native attribute "$native".|);
        ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_NATIVE_SPECIFICATION, $native);
        _throw_drmaa("Unable to set native specification.", $error, $diagnosis) if $error;
    }

    return $jt;
}


# Private method for internal use only. This method is used to submit
# mw (Master/Worker) jobs, which are jobs that iterate over files in a directory,
# lines in a file, or elements in an array, by calling grid_request_worker.
sub _submit_mw {
    $logger->debug("In _submit_mw.");
    my ($self, $jt, $cmd) = @_;
    unless (defined $jt && defined $cmd) {
        Grid::Request::InvalidArgumentException->throw("Job template and/or command object are not defined.");
    }

    $logger->debug("Setting the command executable.");
    my ($error, $diagnosis) = drmaa_set_attribute($jt, $DRMAA_REMOTE_COMMAND, $WORKER);
    _throw_drmaa("Could not set command executable.") if $error;

    # Calculate how many workers we need. First, calculate the number of iterations by
    # examining the mw arguments
    my $min_count;
    foreach my $param ($cmd->params()) {
        if ($param->type() ne "PARAM") {
            my $count = $param->count();
            if (! defined $min_count) {
                $min_count = $count;
            } else {
                if (($count > 0) && ($count < $min_count)) {
                    $min_count = $count;
                    $logger->debug("New minimum iteration count of $min_count.");
                }
            }
        }
    }

    # Approach for master/worker (mw) jobs:
    #
    # 1. For each parameter, create an argument that contains the argument type, and a 
    #    list of the values to iterate over
    # 2. Calculate the minimum number of iterations from the parameters. In other words,
    #    if there is a mismatch, then you have to take the lowest number of parameters
    #    so that all parameters have defined siblings.
    # 3. Based on the number of iterations N, launch a number of workers on the grid to
    #    process these where the number is some function of N, f(N).
    # 4. Pass the path of the exe to the worker program, so that it knows what to execute
    #    The worker will know what portion of the work to do by the task id that the DRM
    #    gives it. In SGE, this is done with an environment variable: SGE_TASK_ID.
    # 5. Worker will replace $(Index) and $(Name) placeholders with the iteration number or
    #    or the value itself in the output file, error file, input file, args, etc...
    #
    #  General form:
    #  /path/to/worker <executable> <iterations> <workers>  \
    #                    param:blah_blah_blah               \
    #                    dir:<directory>:blah_blah_blah     \
    #                    file:<file>:blah_blah_blah
    #
    # Example: /path/to/worker /path/to/user/command 1000 5 \
    #            dir:/path/to/user/directory:-d $(Name)     \
    #            file:/path/to/user/file:-arg $(Name)       \
    #            param:-plain_arg
    #
    # We use a helper utility and method to determine how to divide the work.
    # We don't just path the min_count, because maybe different types of jobs
    # should be split up differently. This is why we pass $cmd, so that more
    # intelligent analysis may be done if configured...
    
    my $block_size = $cmd->block_size();
    if (ref($block_size) eq "CODE") {
        $logger->debug("Detected a code reference for block size.");
        my $block_size_calculator = $block_size;
        $logger->debug("Invoking the code to determine the block size.");
        $block_size = $block_size_calculator->($cmd, $min_count); 

        if (defined($block_size) && length($block_size) && $block_size =~ /^-?\d+$/) {                                                                          
            if ($block_size > 0) {                                                                              
                $logger->debug("Invocation yielded a block size of $block_size.");
            } else {
                Grid::Request::Exception->throw(                                                  
                    "Block size code reference yielded an invalid result. Must be a positive integer.");
            }
        } else {
            Grid::Request::Exception->throw(
                    "Block size code reference yielded an invalid result.");
        }
    } else {
        $logger->debug("block_size is a regular scalar: $block_size");
    }

    # Compute the number of workers to invoke based on the block size.
    my $workers = ceil($min_count / $block_size);

    my $plurality = ($workers == 1) ? "worker" : "workers";
    $logger->info("This master/worker command requires $workers $plurality.");

    my $exe = $cmd->command();
    my @params;
    my $number_of_tasks = $min_count;  # Just a variable rename for clarity

    push (@params, $exe, $block_size);

    my $delim = ':';
    foreach my $param ($cmd->params()) {
        my $arg_type;
        my $type = $param->type();
        if ($type eq "PARAM") {
            $logger->debug("Found a regular (non-MW) parameter.");
            push(@params, "param" . $delim . $param->value());
            next;
        } elsif ($type eq "DIR") {
            $arg_type = "dir";
        } elsif ($type eq "ARRAY") {
            $arg_type = "array";



( run in 1.056 second using v1.01-cache-2.11-cpan-71847e10f99 )