Cassandra-Client

 view release on metacpan or  search on metacpan

lib/Cassandra/Client/Pool.pm  view on Meta::CPAN

package Cassandra::Client::Pool;
our $AUTHORITY = 'cpan:TVDW';
$Cassandra::Client::Pool::VERSION = '0.21';
use 5.010;
use strict;
use warnings;

use Scalar::Util 'weaken';
use Cassandra::Client::Util;
use Cassandra::Client::NetworkStatus;

sub new {
    my ($class, %args)= @_;
    my $self= bless {
        client => $args{client},
        options => $args{options},
        metadata => $args{metadata},
        max_connections => $args{options}{max_connections},
        async_io => $args{async_io},
        policy => $args{load_balancing_policy},

        shutdown => 0,
        pool => {},
        count => 0,
        list => [],

        last_id => 0,
        id2ip => {},

        i => 0,

        connecting => {},
        wait_connect => [],
    }, $class;
    weaken($self->{client});
    $self->{network_status}= Cassandra::Client::NetworkStatus->new(pool => $self, async_io => $args{async_io});
    return $self;
}

sub init {
    my ($self, $callback, $first_connection)= @_;

    # This code can be called twice.

    # If we didn't have a datacenter pinned before, now we do
    $self->{policy}{datacenter} ||= $first_connection->{datacenter};

    $self->add($first_connection);
    $self->{policy}->set_connecting($first_connection->ip_address);
    $self->{policy}->set_connected($first_connection->ip_address);

    # Master selection, warmup, etc
    series([
        sub {
            my ($next)= @_;
            $self->{network_status}->init($next);
        },
        sub {
            my ($next)= @_;

            if ($self->{config}{warmup}) {
                $self->connect_if_needed($next);
            } else {
                $self->connect_if_needed();
                return $next->();
            }
        },
    ], $callback);
}

sub get_one {
    my ($self)= @_;
    return undef unless $self->{count};

    # Round-robin: pick the next one
    return $self->{list}[$self->{i}= (($self->{i}+1) % $self->{count})];
}

sub get_one_cb {
    my ($self, $callback)= @_;

    return $callback->(undef, $self->get_one) if $self->{count};

    if (!%{$self->{connecting}}) {
        $self->connect_if_needed;
    }
    if (!%{$self->{connecting}}) {
        return $callback->("Disconnected: all servers unreachable");
    }

    push @{$self->{wait_connect} ||= []}, {
        callback => $callback,
        attempts => 0,
    };
}



( run in 0.964 second using v1.01-cache-2.11-cpan-39bf76dae61 )