AC-MrGamoo
view release on metacpan or search on metacpan
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
$io->set_callback('shutdown', \&_shutdown, $me);
$io->start();
}
sub abort {
my $me = _find(shift, @_);
return unless $me;
debug("abort task $me->{request}{taskid}");
$me->{io}->shut() if $me->{io};
return 1;
}
sub _find {
my $me = shift;
return $me if ref $me;
my %p = @_;
my $task = $p{taskid};
$me = $REGISTRY{$task};
return $me;
}
sub _timeout {
my $io = shift;
$io->shut();
}
sub _shutdown {
my $io = shift;
my $evt = shift;
my $me = shift;
# send status to master
$me->_send_status();
my $task = $me->{request}{taskid};
delete $REGISTRY{$task};
delete $me->{io};
periodic(1); # try to start another task
}
sub _send_status_done {
my $io = shift;
my $evt = shift;
my $me = shift;
$me->{_status_underway} --;
}
sub _read {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("read child $me->{request}{taskid}: $evt->{data}.");
# read status msg from child
$io->{rbuffer} .= $evt->{data};
my @l = split /^/m, $io->{rbuffer};
$io->{rbuffer} = '';
for my $l (@l){
unless( $l =~ /\n/ ){
$io->{rbuffer} = $l;
last;
}
debug("got status $l");
chomp($l);
my($phase, $amt) = split /\s+/, $l;
$me->{status}{phase} = $phase;
$me->{status}{amt} = $amt;
$me->{status}{fail} = 1 if $phase eq 'FAILED';
# send status to master
$me->_send_status();
}
}
sub _send_status {
my $me = shift;
# don't kill the master with too many requests
# return if $me->{_status_underway} >= $MAXREQ;
my($addr, $port) = get_peer_addr_from_id( $me->{request}{master} );
return unless $addr;
debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
type => 'mrgamoo_taskstatus',
msgidno => $msgid++,
want_reply => 0,
}, {
jobid => $me->{request}{jobid},
taskid => $me->{request}{taskid},
phase => $me->{status}{phase},
progress => $me->{status}{amt},
} );
return unless $x;
$me->{_status_underway} ++;
$x->set_callback('shutdown', \&_send_status_done, $me);
$x->start();
}
sub attr {
my $me = shift;
my $bk = shift;
my $p = shift;
return $bk->{attr}{$p} if $bk && $bk->{attr}{$p};
return $me->{options}{$p};
}
( run in 1.045 second using v1.01-cache-2.11-cpan-39bf76dae61 )