MColPro

 view release on metacpan or  search on metacpan

lib/MColPro/Collect.pm  view on Meta::CPAN

    ## init task
    $self->range();
    for my $i ( @{ $self->{targets} } )
    {
        $self->refresh_task(
            { plugin => 'run_plugin', param => $i, feedback => 1 },
            rand( $self->{conf}{interval} ) );
    }
    my $conftaskcount = @{ $self->{targets} };

    for( my $now; $now = time; )
    {
        while( my $fdb = $feedbackq->dequeue_nb() )
        {
            $self->{taskcount} --;
            $self->refresh_task( unserial( $fdb ) );
        }

        $plat->ask();

        $self->range( $now );

        if( $now > $self->{heartbeat} )
        {
            my $task = 
            {
                plugin => 'record_result',
                start  => $now + $self->{conf}{interval},
                due    => $now + $self->{conf}{interval} + 25,
                param  =>{ result => [ {
                    type => 'MWHB',
                    cluster => 'MWatcher',
                    node => $self->{locate},
                    detail => $self->{confname}.": "
                        .$self->{taskcount}."/".$conftaskcount
                        .", ".$plat->cstring(),
                    label => $self->{confname},
                    level => '0',
                    locate => $self->{locate},
                } ] },
            };
            $self->{disp}->enqueue( [ $task->{start}, serial( $task ) ] );
            $self->{heartbeat} += $self->{config}{heartbeat};
        }

        sleep 1;   
    }
}

sub range
{
    my ( $self, $now ) = @_;

    if( ! defined $self->{range} || $self->{range}{due} < $now )
    {
        my $c = 2;
        while ( $c-- )
        {
            my $range = eval
            {
                DynGig::Range::Cluster->setenv (   
                    server => $self->{config}{range}{server}
                        || 'localhost:65431',
                    timeout => $self->{config}{range}{timeout} || 30,
                ); 
            };
            if ( !$@ && $range )
            {
                $self->{range}{range} = $range;
                last;
            }
            else
            {
                warn "get range error $@";
            }
            sleep 0.5;
        }

        print "range once\n";

        $self->{range}{due} = time + 60;
    }
}

sub refresh_task
{
    my ( $self, $i, $stime ) = @_;

    if ( defined $i->{start} && defined $i->{due} )
    {
        $i->{start} = $i->{start} + $self->{conf}{interval};
        $i->{due} = $i->{due} + $self->{conf}{interval};
    }
    else
    {
        $i->{start} = time + $stime;
        $i->{due} = $i->{start} + $self->{conf}{timeout};
    }
    $i->{param}{RECORD} = $i->{due};

    my $j = $i->{param};
    if ( $j->{range} )
    {
        $j->{range} =~ s/\s//g;
        $j->{range} =~ /(.+?)\((.+?)[:=%](.+?)\)/;

        my @nodes = $self->{range}{range}->expand( $j->{range} );
        my $first = 1;

        while ( 1 )
        {
            delete $j->{targets};
            $j->{targets}{nodes} = [ splice( @nodes, 0 ,
                $self->{conf}{maxnodes} || 60 ) ];
            $j->{targets}{cluster} = $2;
            $j->{targets}{table} = $1;
            $i->{feedback} = $first;

            $self->{disp}->enqueue( [ $i->{start}, serial( $i ) ] );

            $self->{taskcount} += $first;



( run in 1.187 second using v1.01-cache-2.11-cpan-140bd7fdf52 )