AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
# send status to master
$me->_send_status();
my $task = $me->{request}{taskid};
delete $REGISTRY{$task};
delete $me->{io};
periodic(1); # try to start another task
}
sub _send_status_done {
my $io = shift;
my $evt = shift;
my $me = shift;
$me->{_status_underway} --;
}
sub _read {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("read child $me->{request}{taskid}: $evt->{data}.");
# read status msg from child
$io->{rbuffer} .= $evt->{data};
my @l = split /^/m, $io->{rbuffer};
$io->{rbuffer} = '';
for my $l (@l){
unless( $l =~ /\n/ ){
$io->{rbuffer} = $l;
last;
}
debug("got status $l");
chomp($l);
my($phase, $amt) = split /\s+/, $l;
$me->{status}{phase} = $phase;
$me->{status}{amt} = $amt;
$me->{status}{fail} = 1 if $phase eq 'FAILED';
# send status to master
$me->_send_status();
}
}
sub _send_status {
my $me = shift;
# don't kill the master with too many requests
# return if $me->{_status_underway} >= $MAXREQ;
my($addr, $port) = get_peer_addr_from_id( $me->{request}{master} );
return unless $addr;
debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
type => 'mrgamoo_taskstatus',
msgidno => $msgid++,
want_reply => 0,
}, {
jobid => $me->{request}{jobid},
taskid => $me->{request}{taskid},
phase => $me->{status}{phase},
progress => $me->{status}{amt},
} );
return unless $x;
$me->{_status_underway} ++;
$x->set_callback('shutdown', \&_send_status_done, $me);
$x->start();
}
sub attr {
my $me = shift;
my $bk = shift;
my $p = shift;
return $bk->{attr}{$p} if $bk && $bk->{attr}{$p};
return $me->{options}{$p};
}
sub report {
my $txt;
for my $t (values %REGISTRY){
$txt .= "$t->{request}{jobid} $t->{request}{taskid} $t->{status}{phase}\n";
}
return $txt;
}
sub periodic {
my $quick = shift;
# how many tasks are running?
my $nrun = 0;
for my $t (values %REGISTRY){
$nrun ++ if $t->{io};
}
return if $quick && $nrun >= $MAXRUNNING;
# queued? send status, maybe start
for my $t (sort { $a->{_queueprio} <=> $b->{_queueprio} } values %REGISTRY){
next if $t->{io};
$t->_send_status() unless $quick;
if( $nrun < $MAXRUNNING ){
$t->_start();
$nrun ++;
}
( run in 2.930 seconds using v1.01-cache-2.11-cpan-2398b32b56e )