AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 13:02 (EST)
# Function: m/r task (parent side)
#
# $Id: Task.pm,v 1.10 2011/01/14 20:58:26 jaw Exp $
package AC::MrGamoo::Task;
use AC::MrGamoo::Debug 'task';
use AC::MrGamoo::Submit::Compile;
use AC::MrGamoo::Submit::Request;
use AC::MrGamoo::Task::Running;
use AC::MrGamoo::PeerList;
use AC::MrGamoo::Config;
use AC::DC::IO::Forked;
use JSON;
use strict;
my $TSTART = $^T;
my $TIMEOUT = 3600;
my $MAXREQ = 2;
my $MAXRUNNING = 7; # tune me!
my %REGISTRY;
my $msgid = $$;
################################################################
# schedule periodic "cronjob"
AC::DC::Sched->new(
info => "task periodic",
freq => 5,
func => \&periodic,
);
################################################################
sub new {
my $class = shift;
# %ACPMRMTaskCreate
my $me = bless {
request => { @_ },
}, $class;
debug("new task $me->{request}{taskid}");
my $task = $me->{request}{taskid};
return problem("cannot create task: no task id") unless $task;
if( $REGISTRY{$task} ){
verbose("ignoring duplicate request task $task");
# will cause a 200 OK, so the requestor will not retry
return $REGISTRY{$task};
}
$me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
$me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};
# compile
eval {
my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
# merge job config + opts.
$mr->set_config($me->{options});
$mr->set_initres($me->{initres});
$me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
$me->{R}{config}{jobid} = $me->{request}{jobid};
$me->{R}{config}{taskid} = $me->{request}{taskid};
$me->{mr} = $mr;
};
if(my $e = $@){
problem("cannot compile task: $e");
return;
}
# measure
for my $file (@{$me->{request}{infile}}){
my $s = (stat(conf_value('basedir') . '/' . $file))[7];
$me->{_inputsize} += $s
}
debug("input size: $me->{_inputsize}");
# print STDERR "Task: ", dumper($me), "\n";
$REGISTRY{$task} = $me;
return $me;
}
sub start {
my $me = shift;
# if too many tasks are running, queue
my $nrun = 0;
for my $t (values %REGISTRY){
$nrun ++ if $t->{io};
}
if( $nrun >= $MAXRUNNING ){
$me->{_queueprio} = $^T - $TSTART + $me->{_inputsize} / 1_000_000;
$me->{status}{phase} = 'QUEUED';
$me->{status}{amt} = 0;
debug("queue $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid} prio $me->{_queueprio}");
return 1;
}
( run in 0.640 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )