Cassandra-Client
view release on metacpan or search on metacpan
lib/Cassandra/Client/NetworkStatus.pm view on Meta::CPAN
package Cassandra::Client::NetworkStatus;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::NetworkStatus::VERSION = '0.21';
use 5.010;
use strict;
use warnings;
use Scalar::Util qw/weaken/;
use Cassandra::Client::Util;
sub new {
my ($class, %args)= @_;
my $self= bless {
pool => $args{pool},
async_io => $args{async_io},
waiting_for_cb => [],
master_id => undef,
shutdown => undef,
}, $class;
weaken($self->{pool});
return $self;
}
sub init {
my ($self, $callback)= @_;
$self->select_master($callback);
}
sub select_master {
my ($self, $callback)= @_;
return $callback->() if $self->{master_id};
if (@{$self->{waiting_for_cb}}) {
push @{$self->{waiting_for_cb}}, $callback;
return;
}
push @{$self->{waiting_for_cb}}, $callback;
my $pool= $self->{pool}; # non-weak
my $attempts= 0;
whilst(
sub { # condition
!$self->{shutdown} && !$self->{master_id}
},
sub { # while
my ($wnext)= @_;
series([
sub {
my ($next)= @_;
if ($attempts++) {
# Don't retry immediately
$self->{async_io}->timer($next, 1);
} else {
$next->();
}
},
sub {
my ($next)= @_;
$pool->get_one_cb($next);
},
sub {
my ($next, $connection)= @_;
parallel([
sub {
my ($pnext)= @_;
$connection->register_events($pnext);
},
sub {
my ($pnext)= @_;
$connection->get_network_status($pnext);
},
sub {
$_[0]->(undef, $connection);
},
], $next);
}, sub {
my ($next, undef, $networkstatus, $connection)= @_;
$self->{master_id}= $connection->get_pool_id;
$self->load_status($networkstatus);
( run in 0.552 second using v1.01-cache-2.11-cpan-39bf76dae61 )