AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    $io->set_callback('shutdown', \&_shutdown, $me);

    $io->start();
}

sub abort {
    my $me = _find(shift, @_);

    return unless $me;
    debug("abort task $me->{request}{taskid}");
    $me->{io}->shut() if $me->{io};
    return 1;
}


sub _find {
    my $me = shift;
    return $me if ref $me;

    my %p = @_;
    my $task = $p{taskid};
    $me = $REGISTRY{$task};

    return $me;
}

sub _timeout {
    my $io = shift;
    $io->shut();
}

sub _shutdown {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    # send status to master
    $me->_send_status();

    my $task = $me->{request}{taskid};
    delete $REGISTRY{$task};

    delete $me->{io};

    periodic(1);	# try to start another task
}

sub _send_status_done {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    $me->{_status_underway} --;
}

sub _read {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    debug("read child $me->{request}{taskid}: $evt->{data}.");
    # read status msg from child
    $io->{rbuffer} .= $evt->{data};

    my @l = split /^/m, $io->{rbuffer};
    $io->{rbuffer} = '';
    for my $l (@l){
        unless( $l =~ /\n/ ){
            $io->{rbuffer} = $l;
            last;
        }

        debug("got status $l");
        chomp($l);
        my($phase, $amt) = split /\s+/, $l;

        $me->{status}{phase} = $phase;
        $me->{status}{amt}   = $amt;
        $me->{status}{fail}  = 1 if $phase eq 'FAILED';

        # send status to master
        $me->_send_status();
    }
}

sub _send_status {
    my $me = shift;

    # don't kill the master with too many requests
    # return if $me->{_status_underway} >= $MAXREQ;

    my($addr, $port) = get_peer_addr_from_id( $me->{request}{master} );
    return unless $addr;

    debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
        type		=> 'mrgamoo_taskstatus',
        msgidno		=> $msgid++,
        want_reply	=> 0,
    }, {
        jobid		=> $me->{request}{jobid},
        taskid		=> $me->{request}{taskid},
        phase		=> $me->{status}{phase},
        progress	=> $me->{status}{amt},
    } );

    return unless $x;

    $me->{_status_underway} ++;
    $x->set_callback('shutdown', \&_send_status_done, $me);

    $x->start();

}

sub attr {
    my $me = shift;
    my $bk = shift;
    my $p  = shift;

    return $bk->{attr}{$p} if $bk && $bk->{attr}{$p};
    return $me->{options}{$p};
}



( run in 1.045 second using v1.01-cache-2.11-cpan-39bf76dae61 )