Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/NetworkStatus.pm  view on Meta::CPAN

package Cassandra::Client::NetworkStatus;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::NetworkStatus::VERSION = '0.21';
use 5.010;
use strict;
use warnings;

use Scalar::Util qw/weaken/;
use Cassandra::Client::Util;

sub new {
    my ($class, %args)= @_;

    my $self= bless {
        pool => $args{pool},
        async_io => $args{async_io},

        waiting_for_cb => [],
        master_id => undef,

        shutdown => undef,
    }, $class;
    weaken($self->{pool});
    return $self;
}

sub init {
    my ($self, $callback)= @_;
    $self->select_master($callback);
}

sub select_master {
    my ($self, $callback)= @_;

    return $callback->() if $self->{master_id};
    if (@{$self->{waiting_for_cb}}) {
        push @{$self->{waiting_for_cb}}, $callback;
        return;
    }
    push @{$self->{waiting_for_cb}}, $callback;

    my $pool= $self->{pool}; # non-weak

    my $attempts= 0;
    whilst(
        sub { # condition
            !$self->{shutdown} && !$self->{master_id}
        },
        sub { # while
            my ($wnext)= @_;
            series([
                sub {
                    my ($next)= @_;
                    if ($attempts++) {
                        # Don't retry immediately
                        $self->{async_io}->timer($next, 1);
                    } else {
                        $next->();
                    }
                },
                sub {
                    my ($next)= @_;
                    $pool->get_one_cb($next);
                },
                sub {
                    my ($next, $connection)= @_;
                    parallel([
                        sub {
                            my ($pnext)= @_;
                            $connection->register_events($pnext);
                        },
                        sub {
                            my ($pnext)= @_;
                            $connection->get_network_status($pnext);
                        },
                        sub {
                            $_[0]->(undef, $connection);
                        },
                    ], $next);
                }, sub {
                    my ($next, undef, $networkstatus, $connection)= @_;
                    $self->{master_id}= $connection->get_pool_id;
                    $self->load_status($networkstatus);



( run in 0.552 second using v1.01-cache-2.11-cpan-39bf76dae61 )