Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/NetworkStatus.pm  view on Meta::CPAN

package Cassandra::Client::NetworkStatus;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::NetworkStatus::VERSION = '0.21';
use 5.010;
use strict;
use warnings;

use Scalar::Util qw/weaken/;
use Cassandra::Client::Util;

sub new {
    my ($class, %args)= @_;

    my $self= bless {
        pool => $args{pool},
        async_io => $args{async_io},

        waiting_for_cb => [],
        master_id => undef,

        shutdown => undef,
    }, $class;
    weaken($self->{pool});
    return $self;
}

sub init {
    my ($self, $callback)= @_;
    $self->select_master($callback);
}

sub select_master {
    my ($self, $callback)= @_;

    return $callback->() if $self->{master_id};
    if (@{$self->{waiting_for_cb}}) {
        push @{$self->{waiting_for_cb}}, $callback;
        return;
    }
    push @{$self->{waiting_for_cb}}, $callback;

    my $pool= $self->{pool}; # non-weak

    my $attempts= 0;
    whilst(
        sub { # condition
            !$self->{shutdown} && !$self->{master_id}
        },
        sub { # while
            my ($wnext)= @_;
            series([
                sub {
                    my ($next)= @_;
                    if ($attempts++) {
                        # Don't retry immediately
                        $self->{async_io}->timer($next, 1);
                    } else {
                        $next->();
                    }
                },
                sub {
                    my ($next)= @_;
                    $pool->get_one_cb($next);
                },
                sub {
                    my ($next, $connection)= @_;
                    parallel([
                        sub {
                            my ($pnext)= @_;
                            $connection->register_events($pnext);
                        },
                        sub {
                            my ($pnext)= @_;
                            $connection->get_network_status($pnext);
                        },
                        sub {
                            $_[0]->(undef, $connection);
                        },
                    ], $next);
                }, sub {
                    my ($next, undef, $networkstatus, $connection)= @_;
                    $self->{master_id}= $connection->get_pool_id;
                    $self->load_status($networkstatus);
                    $next->();
                },
            ], sub {
                $wnext->();
            });
        },
        sub { # finish
            my ($error)= @_;
            my @cb= @{$self->{waiting_for_cb}};
            $self->{waiting_for_cb}= [];
            $error= $error || ($self->{master_id} ? undef : "Master selection aborted");
            $_->($error) for @cb;
        }
    );
}

sub shutdown {
    my ($self)= @_;
    $self->{shutdown}= 1;
}

sub load_status {
    my ($self, $new_status)= @_;
    my $old_status= $self->{status};
    $self->{status}= $new_status;

    my @old_hosts= grep {!$new_status->{$_}} keys %$old_status;
    my @new_hosts= grep {!$old_status->{$_}} keys %$new_status;

    $self->{pool}->on_removed_node($old_status->{$_}) for @old_hosts;
    $self->{pool}->on_new_node($new_status->{$_}) for @new_hosts;
}



( run in 2.429 seconds using v1.01-cache-2.11-cpan-140bd7fdf52 )