AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 13:02 (EST)
# Function: m/r task (parent side)
#
# $Id: Task.pm,v 1.10 2011/01/14 20:58:26 jaw Exp $

package AC::MrGamoo::Task;
use AC::MrGamoo::Debug 'task';
use AC::MrGamoo::Submit::Compile;
use AC::MrGamoo::Submit::Request;
use AC::MrGamoo::Task::Running;
use AC::MrGamoo::PeerList;
use AC::MrGamoo::Config;
use AC::DC::IO::Forked;
use JSON;
use strict;

my $TSTART  = $^T;
my $TIMEOUT = 3600;
my $MAXREQ  = 2;
my $MAXRUNNING = 7;	# tune me!
my %REGISTRY;
my $msgid = $$;


################################################################

# schedule periodic "cronjob"
AC::DC::Sched->new(
    info	=> "task periodic",
    freq	=> 5,
    func	=> \&periodic,
   );

################################################################

sub new {
    my $class = shift;
    # %ACPMRMTaskCreate

    my $me = bless {
        request		=> { @_ },
    }, $class;
    debug("new task $me->{request}{taskid}");

    my $task = $me->{request}{taskid};
    return problem("cannot create task: no task id")   unless $task;
    if( $REGISTRY{$task} ){
        verbose("ignoring duplicate request task $task");
        # will cause a 200 OK, so the requestor will not retry
        return $REGISTRY{$task};
    }

    $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
    $me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};

    # compile
    eval {
        my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
        # merge job config + opts.
        $mr->set_config($me->{options});
        $mr->set_initres($me->{initres});
        $me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
        $me->{R}{config}{jobid}  = $me->{request}{jobid};
        $me->{R}{config}{taskid} = $me->{request}{taskid};
        $me->{mr} = $mr;
    };
    if(my $e = $@){
        problem("cannot compile task: $e");
        return;
    }

    # measure
    for my $file (@{$me->{request}{infile}}){
        my $s = (stat(conf_value('basedir') . '/' . $file))[7];
        $me->{_inputsize} += $s
    }

    debug("input size: $me->{_inputsize}");

    # print STDERR "Task: ", dumper($me), "\n";
    $REGISTRY{$task} = $me;
    return $me;
}

sub start {
    my $me = shift;

    # if too many tasks are running, queue
    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{io};
    }

    if( $nrun >= $MAXRUNNING ){
        $me->{_queueprio}    = $^T - $TSTART + $me->{_inputsize} / 1_000_000;
        $me->{status}{phase} = 'QUEUED';
        $me->{status}{amt}   = 0;
        debug("queue $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid} prio $me->{_queueprio}");
        return 1;
    }



( run in 0.640 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )