MColPro
view release on metacpan or search on metacpan
lib/MColPro/Collect.pm view on Meta::CPAN
## init task
$self->range();
for my $i ( @{ $self->{targets} } )
{
$self->refresh_task(
{ plugin => 'run_plugin', param => $i, feedback => 1 },
rand( $self->{conf}{interval} ) );
}
my $conftaskcount = @{ $self->{targets} };
for( my $now; $now = time; )
{
while( my $fdb = $feedbackq->dequeue_nb() )
{
$self->{taskcount} --;
$self->refresh_task( unserial( $fdb ) );
}
$plat->ask();
$self->range( $now );
if( $now > $self->{heartbeat} )
{
my $task =
{
plugin => 'record_result',
start => $now + $self->{conf}{interval},
due => $now + $self->{conf}{interval} + 25,
param =>{ result => [ {
type => 'MWHB',
cluster => 'MWatcher',
node => $self->{locate},
detail => $self->{confname}.": "
.$self->{taskcount}."/".$conftaskcount
.", ".$plat->cstring(),
label => $self->{confname},
level => '0',
locate => $self->{locate},
} ] },
};
$self->{disp}->enqueue( [ $task->{start}, serial( $task ) ] );
$self->{heartbeat} += $self->{config}{heartbeat};
}
sleep 1;
}
}
sub range
{
my ( $self, $now ) = @_;
if( ! defined $self->{range} || $self->{range}{due} < $now )
{
my $c = 2;
while ( $c-- )
{
my $range = eval
{
DynGig::Range::Cluster->setenv (
server => $self->{config}{range}{server}
|| 'localhost:65431',
timeout => $self->{config}{range}{timeout} || 30,
);
};
if ( !$@ && $range )
{
$self->{range}{range} = $range;
last;
}
else
{
warn "get range error $@";
}
sleep 0.5;
}
print "range once\n";
$self->{range}{due} = time + 60;
}
}
sub refresh_task
{
my ( $self, $i, $stime ) = @_;
if ( defined $i->{start} && defined $i->{due} )
{
$i->{start} = $i->{start} + $self->{conf}{interval};
$i->{due} = $i->{due} + $self->{conf}{interval};
}
else
{
$i->{start} = time + $stime;
$i->{due} = $i->{start} + $self->{conf}{timeout};
}
$i->{param}{RECORD} = $i->{due};
my $j = $i->{param};
if ( $j->{range} )
{
$j->{range} =~ s/\s//g;
$j->{range} =~ /(.+?)\((.+?)[:=%](.+?)\)/;
my @nodes = $self->{range}{range}->expand( $j->{range} );
my $first = 1;
while ( 1 )
{
delete $j->{targets};
$j->{targets}{nodes} = [ splice( @nodes, 0 ,
$self->{conf}{maxnodes} || 60 ) ];
$j->{targets}{cluster} = $2;
$j->{targets}{table} = $1;
$i->{feedback} = $first;
$self->{disp}->enqueue( [ $i->{start}, serial( $i ) ] );
$self->{taskcount} += $first;
( run in 1.187 second using v1.01-cache-2.11-cpan-140bd7fdf52 )